文章目录
Kafka 是一个由 LinkedIn 开发的分布式消息系统,它于2011年年初开源,现在由著名的 Apache 基金会维护与开发。 Kafka 使用 Scala 实现,所以kafka发布的版本号通常含有两部分,例如kafka_2.12-3.1.0.tgz,其中,2.12为scala版本,3.1.0为kafka版本, Kafka 是基于消息发布﹣订阅模式实现的消息系统,具体实现原理参考官方文档这里不再冗余阐述。
我的应用场景:服务器上部署了两套软件,一个是负责日常事务处理的app应用,另一个是负责项目立项管理的springboot项目,为了方便用户随时查看是否有项目立项流程发起或者走到了哪个节点,项目立项这个软件需要把流程信息推送到kafka中间件,app订阅到相关消息后查询显示出来。下面讲一下具体的实现,这个功能是我在第一次接触的情况下,只用了一两天就搞定的,跟网上查到的资料相比有坑的地方会标红显示。
官网下载链接
我下载的是kafka_2.12-3.1.0.tgz 这个版本,应该是比较新的,可以兼容以前的旧版本。

kafka只需要解压下载的压缩包就行了,我这里解压的路径是D:\kafka_2.12-3.1.0。
kafka的运行需要依赖zookeeper, kafka从2.8.0版本之后就内置了zookeeper.jar文件,用命令行启动即可,不需要单独安装zookeeper了
想要启动kafka需要修改kafka配置文件和zookeeper配置文件,配置文件都在跟目录下面的config文件夹下。
kafka 服务端配置在server.properties文件中,这里需要修改两处配置:listeners 和 log.dirs
listeners:服务器监听的地址,修改如下:
listeners=PLAINTEXT://localhost:9092
log.dirs:日志文件修改为自定义的日志目录,我的是log.dirs=D:/kafka_2.12-3.1.0/logs
这里最好改一下,默认日志放在/temp路径,Linux环境因为临时文件夹temp不稳定的原因也需要改,Windows环境改到当前目录方便后面出现问题时删除日志重新启动
zookeeper配置文件为zookeeper.properties,只需修改一处:
dataDir:zookeeper存储数据的路径,Windows环境路径要用D:\\kafka3.2.1\\datas这种形式
我遇到了Kafka异常重启后提示错误:The Cluster ID XXXXX doesn‘t match stored clusterId Some(XXXXX) in meta.properties.
但我没有找到这个meta.properties文件,查了半天原因是log.dirs路径配置的不对
接下来进入测试阶段:
1. 启动zookeeper
先启动zookeeper,进入kafka安装根目录下,地址栏输入cmd,然后回车,注意启动之后不要关闭窗口。启动命令如下:
本地:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
服务器要用绝对路径: start cmd /k D:\kafka3.2.1\bin\windows\zookeeper-server-start.bat D:\kafka3.2.1\config\zookeeper.properties

没有报错就可以了
2. 启动kafka服务端
同样进入kafka安装根目录下,地址栏输入cmd,然后回车,启动之后不要关闭窗口。启动命令如下:
启动kafka-server
本地:
.\bin\windows\kafka-server-start.bat .\config\server.properties
服务端:
start cmd /k “C:\EAMServer\kafka3.2.1\bin\windows\kafka-server-start.bat C:\EAMServer\kafka3.2.1\config\server.properties”
也是没有报错就算启动成功了,如果启动kafka失败,并出现以下异常,删除logs文件夹下的meta.properties文件即可。
The Cluster ID xxxx doesn’t match stored clusterId Some(finN2zUTRWaXMomXCknRew) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.
3. 创建kafka-topics
启动zookeeper和kafka服务端这两个命令窗口是必需的,这里通过脚本创建topic通常是用于本地测试kafka服务是否能正常发布和接收消息(新手可以用脚本创建一下测测,用Java实现发送消息可自动创建topic)
同样进入kafka安装根目录下,地址栏输入cmd,然后回车,启动之后不要关闭窗口。假设创建一个名字为test的topic命令如下:
start cmd /k .\bin\windows\kafka-topics.bat --create --bootstrap-server 10.0.102.132:9092 --replication-factor 1 --partitions 1 --topic test
这里有坑:新版的主题通过kafka服务端创建即可,也就是 --bootstrap-server这个地址,网上好多资料都是旧版的连接zookeeper创建的,在新版可能报错
–partitions 1意思是建立一个分区,–replication-factor 1是配置一个副本,因为本文讲的是单节点服务所以默认一个分区,集群可设置多个。启动之后,kafka-topics处于等待创建topic状态,一段时间内如果不createTopic,kafka-topics将自动断开
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
启动生产者之后就可以发送消息了
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
启动消费者之后生产者发送的消息,消费者端就能收到了。
至此,消息队列kafka就安装完毕,完全可以通过命令行测试服务是否正常。
我用的是kafka Tool,下载下来的软件名字是Offset Explorer 2.3
用客户端工具看所有的Topic和接收的消息内容非常直观,实乃开发利器。

<!-- kafka 消息队列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.1</version>
</dependency>
spring:
kafka:
bootstrap-servers: 10.0.102.132:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: am #群组ID
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
auto-offset-reset: earliest
enable-auto-commit: true
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
发送数据代码非常简单,开发人员基本上就关注怎么组装消息报文就行了,消息发送就一行代码:
@Autowired
KafkaTemplate kafkaTemplate; // 上面注入一个KafkaTemplate对象
... // 组装JSONData
kafkaTemplate.send(“test”, JSONData); // 直接用send方法,参数是topic名称和JSON报文数据,这行代码加到任何需要发送kafka消息的方法中
这里需要关注一个知识点:kafkaTemplate.send(“test”, JSONData)是kafka默认的异步消息发送,异步发送消息时,只要消息积累达到batch.size值或者积累消息的时间超过linger.ms(二者满足其一),producer就会把该批量的消息发送到topic中。
注:batch.size默认是16384,linger.ms默认是0,这两个参数可在springBoot项目配置文件中增加/修改。
同步发送消息时,需要在每次send()方法调用get()方法,因为每次send()方法会返回一个Future类型的值,Future的get()方法会一直阻塞,知道该线程的任务获取到返回值,即当消息发送成功。可在返回future类型后增加回调函数执行发送成功后的处理逻辑。
订阅接收消息就是要加一个@KafkaListener注解,指定Topic主题和groupId(随意取,是为了区分订阅者是谁)
@KafkaListener(topics = "test", groupId = "am")
public void onMessage(ConsumerRecord<?, ?> record){
System.out.println("消费消息,record:"+record.topic()+"-"+record.partition()+"-"+record.value());
Optional<Object> kafkaMassage = Optional.ofNullable(record.value());
if (kafkaMassage.isPresent()) {
Object o = kafkaMassage.get();
logger.info("ConsumerController.groupId[gdzc-uims-iam]:" + o);
try {
Map<String, Object> res = JSON.parseObject(o.toString(), Map.class);
.......
} catch (Exception e) {
logger.error("KafkaConsumer error", e.toString());
}
}
}
以上就是windows环境中单节点kafka中间件的配置实现过程,通过两端代码展示了发送消息和接收订阅消息的代码,实现超简单,看完本文就会使用了。具体kafka的实现原理和集群配置感兴趣的可以深入研究一下,后续我也可能会更新集群配置的文章,关注我吧。

我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0
我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘
我正在尝试修改当前依赖于定义为activeresource的gem:s.add_dependency"activeresource","~>3.0"为了让gem与Rails4一起工作,我需要扩展依赖关系以与activeresource的版本3或4一起工作。我不想简单地添加以下内容,因为它可能会在以后引起问题:s.add_dependency"activeresource",">=3.0"有没有办法指定可接受版本的列表?~>3.0还是~>4.0? 最佳答案 根据thedocumentation,如果你想要3到4之间的所有版本,你可以这
有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
如果我使用ruby版本2.5.1和Rails版本2.3.18会怎样?我有基于rails2.3.18和ruby1.9.2p320构建的rails应用程序,我只想升级ruby的版本,而不是rails,这可能吗?我必须面对哪些挑战? 最佳答案 GitHub维护apublicfork它有针对旧Rails版本的分支,有各种变化,它们一直在运行。有一段时间,他们在较新的Ruby版本上运行较旧的Rails版本,而不是最初支持的版本,因此您可能会发现一些关于需要向后移植的有用提示。不过,他们现在已经有几年没有使用2.3了,所以充其量只能让更
我安装了ruby版本管理器,并将RVM安装的ruby实现设置为默认值,这样'哪个ruby'显示'~/.rvm/ruby-1.8.6-p383/bin/ruby'但是当我在emacs中打开inf-ruby缓冲区时,它使用安装在/usr/bin中的ruby。有没有办法让emacs像shell一样尊重ruby的路径?谢谢! 最佳答案 我创建了一个emacs扩展来将rvm集成到emacs中。如果您有兴趣,可以在这里获取:http://github.com/senny/rvm.el
这似乎非常适得其反,因为太多的gem会在window上破裂。我一直在处理很多mysql和ruby-mysqlgem问题(gem本身发生段错误,一个名为UnixSocket的类显然在Windows机器上不能正常工作,等等)。我只是在浪费时间吗?我应该转向不同的脚本语言吗? 最佳答案 我在Windows上使用Ruby的经验很少,但是当我开始使用Ruby时,我是在Windows上,我的总体印象是它不是Windows原生系统。因此,在主要使用Windows多年之后,开始使用Ruby促使我切换回原来的系统Unix,这次是Linux。Rub
我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b
我正在玩HTML5视频并且在ERB中有以下片段:mp4视频从在我的开发环境中运行的服务器很好地流式传输到chrome。然而firefox显示带有海报图像的视频播放器,但带有一个大X。问题似乎是mongrel不确定ogv扩展的mime类型,并且只返回text/plain,如curl所示:$curl-Ihttp://0.0.0.0:3000/pr6.ogvHTTP/1.1200OKConnection:closeDate:Mon,19Apr201012:33:50GMTLast-Modified:Sun,18Apr201012:46:07GMTContent-Type:text/plain