草庐IT

技术分享| 消息队列Kafka群集部署

anyRTC 2023-03-28 原文

一、简介

1、介绍
Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

2、主要应用场景是
  • 日志收集:可以用kafka收集各种服务的日志 ,通过已统一接口的形式开放给各种消费者。

  • 消息系统:解耦生产和消费者,缓存消息。

  • 用户活动追踪:kafka可以记录webapp或app用户的各种活动,如浏览网页,点击等活动,这些活动可以发送到kafka,然后订阅者通过订阅这些消息来做监控。

  • 运营指标:可以用于监控各种数据。

3、Kafka主要设计目标如下:
  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持线水平扩展
4、基本概念
kafka是一个分布式的分区的消息,提供消息系统应该具备的功能。

名称 解释
broker 消息中间件处理节点,一个broker就是一个kafka节点,多个broker构成一个kafka集群。
topic kafka根据消息进行分类,发布到kafka的每个消息都有一个对应的topic
producer 消息生产(发布)者
consumer 消息消费(订阅)者
consumergroup 消息订阅集群,一个消息可以被多个consumergroup消费,但是一个consumergroup只有一个consumer可以消费消息。

二、环境准备

当前环境:centos7.9三台 软件版本:kafka_2.13-3.0.0 环境目录:/usr/local/kafka 下载kafka;包含了zookeeper(三台机器都要操作)

[root@localhost opt]# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz [root@localhost opt]# tar zxvf kafka_2.13-3.0.0.tgz [root@localhost opt]# mv kafka_2.13-3.0.0 /usr/local/kafka 配置环境变量(三台机器都要操作)

[root@localhost opt]# vim /etc/profile ## 末尾添加 export ZOOKEEPER_HOME=/usr/local/kafka export PATH=$PATH:$ZOOKEEPER_HOME/bin ## 加载环境变量 [root@localhost opt]# source /etc/profile

三、Zookeeper的安装

由于Kafka分区位置和主题配置之类的元数据都存储在ZooKeeper群集中,所以要搭建kafka集群环境必须先安装zookeeper群集

zookeeper配置文件修改(三台配置一样)

[root@localhost ~]# cd /usr/local/kafka/ [root@localhost kafka]# vim config/zookeeper.properties

dataDir=/tmp/zookeeper ## 主要用来配置zookeeper server数据的存放路径 clientPort=2181 ## zookeeper服务端口 tickTime=2000 ## zookeeper客户端与服务器之间的心跳时间。默认值为2000毫秒,即2秒 initLimit=10 ## Follower连接到Leader并同步数据的最大时间 syncLimit=5 ## Follower同步Leader的最大时间 maxClientCnxns=0 ## 客户端最大连接数,设置0或者不设置表示取消连接数限制 admin.enableServer=false ## 禁用 Admin Server server.0=192.168.1.13:2888:3888 ## 配置zookeeper群集 server.1=192.168.1.108:2888:3888 server.2=192.168.1.143:2888:3888 创建myid文件(三台机器都要操作)

192.168.1.13机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "0" > /tmp/zookeeper/myid 192.168.1.108机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "1" > /tmp/zookeeper/myid 192.168.1.143机器上

[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "2" > /tmp/zookeeper/myid 创建system启动文件

[root@localhost kafka]# vim /usr/lib/systemd/system/zookeeper.service

[Unit] Description=zookeeper After=network.target Wants=network.target [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin" User=root Group=root ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties Restart=always [Install] WantedBy=multi-user.target 启动zookeeper

[root@localhost kafka]# systemctl enable zookeeper.service [root@localhost kafka]# systemctl start zookeeper.service

四、Kafka的安装

kafka配置文件修改

[root@localhost ~]# cd /usr/local/kafka/ [root@localhost kafka]# vim config/server.properties (直接复制到另外两台)

broker.id=1 ## 第一台1,第二台为2 第三台为3 listeners=PLAINTEXT://:9092 ## 监听IP地址和端口 advertised.listeners=PLAINTEXT://192.168.1.13:9092 ## 配置kafka的broker ip num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=404857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.1.13:2181,192.168.1.108:2181,192.168.1.143:2181 ##连接zookeeper zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0 ## 修改192.168.1.108的配置 broker.id=2 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.1.108:9092 修改192.168.1.143的配置 broker.id=3 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://192.168.1.143:9092 ## 修改JVM参数 (根据需求修改) kafka_heap_opts:指定堆大小,默认是1GB [root@localhost kafka]# vim bin/kafka-server-start.sh if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G -XX:MaxDirectMemorySize=1G" ## 修改这一行即可 fi 创建system启动文件

[root@localhost kafka]# vim /usr/lib/systemd/system/kafka.service

[Unit] Description=kafka After=network.target Wants=network.target [Service] Type=simple Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin" User=root Group=root ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties Restart=always [Install] WantedBy=multi-user.target 启动zookeeper [root@localhost kafka]# systemctl enable kafka.service [root@localhost kafka]# systemctl start kafka.service

五、踩坑

服务启动不起来 找不到Java
/usr/local/kafka/bin/kafka-run-class.sh: line 309: exec: java: not found

第一种: 修改配置文件中的Java环境

## 修改kafka-run-class.sh 配置文件 [root@localhost kafka]# vim bin/kafka-run-class.sh ## 找到如下: # Which java to use if [ -z "$JAVA_HOME" ]; then JAVA="/usr/local/jdk/bin/java" ## 修改到绝对路径 else JAVA="$JAVA_HOME/bin/java" fi 第二种: 做个软连接

[root@localhost kafka]# echo $PATH /usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin:/home/laserx/.local/bin:/home/laserx/bin [root@localhost kafka]# ln -s /usr/local/jdk1.8.0_251/bin/java /usr/local/bin/java

有关技术分享| 消息队列Kafka群集部署的更多相关文章

  1. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  2. ruby-on-rails - 每次我尝试部署时,我都会得到 - (gcloud.preview.app.deploy) 错误响应 : [4] DEADLINE_EXCEEDED - 2

    我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie

  3. ruby - 分布式事务和队列,ruby,erlang,scala - 2

    我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和

  4. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  5. ruby-on-rails - Ruby on Rails 可以部署在 Azure 网站上吗? - 2

    我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/

  6. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  7. Unity 热更新技术 | (三) Lua语言基本介绍及下载安装 - 2

    ?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------

  8. jenkins部署1--jenkins+gitee持续集成 - 2

    前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon

  9. MIMO-OFDM无线通信技术及MATLAB实现(1)无线信道:传播和衰落 - 2

     MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO

  10. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

随机推荐