草庐IT

Springboot中使用kafka

欧内的手好汗 2023-09-02 原文

首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。

此文背景的环境是windows,linux流程也差不多。

  • 解压在D盘下或者什么地方,注意不要放在桌面等绝对路径太长的地方

  • 打开config中的 zookeeper.properties,自己选择性修改clientPort,不想改也行

  • 修改config中的 server.properties

        1.取消 advertised.listeners 注释,修改kafka地址与端口。如果要集群部署,broker.id不能重复,1号机是0,2号机是1这样的。

        2.修改 zookeeper.connect 为你上面zookeeper.properties中配置的地址

  • 配置好了,尝试开启kafka。

        来到bin/windows,shift右键在此处打开cmd,输入 zookeeper-server-start.bat ../../config/zookeeper.properties,等待其启动。(注意你config的路径不要写错)

        启动完再开一个cmd,输入 kafka-server-start.bat ../../config/server.properties

如果在此环节出现问题,请查看logs中的日志,面向csdn。

        我遇到的问题是 他显示什么什么路径太长了,问题的原因是我把他放桌面上了。

  • 新建springboot项目,pom中添加依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.28</version>
    </dependency>
</dependencies>
  • 配置application.yml,写启动类,controller,创建User类,创建consumer

application.yml

spring:
  application:
    name: kafka
  kafka:
    bootstrap-servers: localhost:9092 #这个是你server.properties中配置的
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-consumer-group #这个去config/consumer.properties中查看和修改
                                # 不过好像不一样也不影响?
server:
  port: 8001

controller

@RestController
public class ProducerController {
​
    @Autowired
    KafkaTemplate<String, String> kafka;
​
    @RequestMapping("register")
    public String register(User user) {
        String message = JSON.toJSONString(user);
        System.out.println("接收到用户信息:" + message);
        kafka.send("register", message);
        //kafka.send(String topic, @Nullable V data) {
        return "OK";
    }
}

user

@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
​
    private String id;
​
    private String name;
​
    private Integer age;
}

consumer

@Configuration
public class Consumer {
​
    @KafkaListener(topics = "register")
    public void consume(String message) {
        System.out.println("接收到消息:" + message);
        User user = JSON.parseObject(message, User.class);
        System.out.println("正在为 " + user.getName() + " 办理注册业务...");
        System.out.println("注册成功");
    }
}

此时启动springboot,然而报错了

org.springframework.context.ApplicationContextException: 
    Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';

nested exception is java.lang.IllegalStateException:
     Topic(s) [register] is/are not present and missingTopicsFatal is true

为什么呢?

请检查zookeeper和kafka的cmd上有没有他们启动失败的消息,如果有就重新启动下,记得先开zookeeper再开kafka。

然后确认你的kafka上有没有"register"这个topic,此时我是没有的,而consumer又加了 @KafkaListener(topics = "register") 注解,于是他就启动失败了。

有两种解决方式:

1.比较傻X的方式,先将@KafkaListener注释掉,启动springboot后访问localhost:8001/register,他send的时候就会自行创建topic,再取消注释重新启动就可以了。

2.cmd方式:输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic register

然后我们再启动,已经启动成功了。访问 localhost:8001/register?name=JamesBond&age=55

 

我们可以看到数据已经成功送到那里了。

然后来测试一下速度

@RequestMapping("test")
public String test() {
    System.out.println("发送开始" + System.currentTimeMillis() % 10000);
    for (int i = 0; i < 1000; i++) {
        kafka.send("test", JSON.toJSONString(new User((1289312+i)+"",
                "User" + i, (int)(Math.random() * 100), info)));
    }
    System.out.println("发送结束" + System.currentTimeMillis() % 10000);
    return "OK";
}
@KafkaListener(topics = "test")
public void test(String message) {
    System.out.println("--" + System.currentTimeMillis() % 10000 + "--");
}

console:

发送开始1267
--1384--
--1384--
...
--1715--
--1715--
发送结束1715
--1715--
--1715--
...
--1734--

对比RabbitMQ:

发送开始5692
--5766--
--5766--
...
--5973--
--5974--
发送结束5976
--5977--
--5977--
...
--6456--

kafka:

        发送结束 - 发送开始=448ms

        接收结束 - 接收开始=350ms

        整体耗时: 467ms

rabbit:

        发送结束 - 发送开始=284ms

        接收结束 - 接收开始=690ms

        整体耗时: 764ms

OK既然我会用了 我就去学一下kafka基本知识了

有关Springboot中使用kafka的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐