参考: 【kafka专栏】不用zookeeper怎么安装kafka集群-最新kafka3.0版本
在本专栏的之前的一篇文章《kafka3种zk的替代方案》已经为大家介绍过在kafka3.0种已经可以将zookeeper去掉。

上图中黑色代表broker(消息代理服务),褐色/蓝色代表Controller(集群控制器服务)
- 左图(kafka2.0):一个集群所有节点都是broker角色,kafka从三个broker中选举出来一个Controller控制器,控制器将集群元数据信息(比如主题分类、消费进度等)保存到zookeeper,用于集群各节点之间分布式交互。
- 右图(kafka3.0):假设一个集群有四个broker,指定三个作为Conreoller角色(蓝色),从三个Controller中选举出来一个Controller作为主控制器(褐色),其他的2个备用。zookeeper不再被需要!相关的元数据信息以kafka日志的形式存在(即:以消息队列消息的形式存在)。
- controller通信端口:9093, 作用与zk的2181端口类似 。
在搭建kafka3.0集群之前, 我们需要先做好kafka实例角色规划。(四个broker, 需要通过主动配置指定三个作为Controller, Controller需要奇数个, 这一点和zk是一样的)
| 主机名称 | ip | 角色 | node.id |
|---|---|---|---|
| kafka-vm1 | 192.168.1.111 | broker,controller | 1 |
| kafka-vm2 | 192.168.1.112 | broker,controller | 2 |
| kafka-vm3 | 192.168.1.113 | broker,controller | 3 |
| kafka-vm4 | 192.168.1.114 | broker | 4 |
- kafka3.x不再支持JDK8,建议安装JDK11或JDK17。
- 新建kafka持久化日志数据mkdir -p /data/kafka;并保证安装kafka的用户具有该目录的读写权限。
各个机器节点执行:
# 安装jdk(kafka3.x不再支持JDK8,建议安装JDK11或JDK17, 这里安装jdk11)
# 下载安装jdk11, 参考: https://blog.csdn.net/justlpf/article/details/127268046
# 下载kafka
adduser kafka
cd /opt
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar -xf kafka_2.12-3.3.1.tgz
chown -R kafka:kafka kafka_2.12-3.3.1*
mkdir -p /data/kafka
chown -R kafka:kafka /data/kafka
192.168.1.111 data-vm1
192.168.1.112 data-vm2
192.168.1.113 data-vm3
192.168.1.114 data-vm4
在kafka3.x版本中,使用Kraft协议代替zookeeper进行集群的Controller选举,所以要针对它进行配置。
vi /opt/kafka_2.12-3.3.1/config/kraft/server.properties
具体配置参数如下:
# data-vm1节点
node.id=1
process.roles=broker,controller
listeners=PLAINTEXT://data-vm1:9092,CONTROLLER://data-vm1:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
# data-vm2节点
node.id=2
process.roles=broker,controller
listeners=PLAINTEXT://data-vm2:9092,CONTROLLER://data-vm2:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
# data-vm3节点
node.id=3
process.roles=broker,controller
listeners=PLAINTEXT://data-vm3:9092,CONTROLLER://data-vm3:9093
advertised.listeners=PLAINTEXT://:9092
controller.quorum.voters=1@data-vm1:9093,2@data-vm2:9093,3@data-vm3:9093
log.dirs=/data/kafka/
node.id:这将作为集群中的节点 ID,唯一标识,按照我们事先规划好的(上文),在不同的服务器上这个值不同。其实就是kafka2.0中的broker.id,只是在3.0版本中kafka实例不再只担任broker角色,也有可能是controller角色,所以改名叫做node节点。process.roles:一个节点可以充当broker或controller或两者兼而有之。按照我们事先规划好的(上文),在不同的服务器上这个值不同。多个角色用逗号分开。listeners: broker将使用9092端口,而kraft controller控制器将使用9093端口。advertised.listeners: 这里指定kafka通过代理暴漏的地址,如果都是局域网使用,就配置PLAINTEXT://:9092即可。controller.quorum.voters:这个配置用于指定controller主控选举的投票节点,所有process.roles包含controller角色的规划节点都要参与,即:zimug1、zimug2、zimug3。其配置格式为:node.id1@host1:9093,node.id2@host2:9093log.dirs:kafka 将存储数据的日志目录,在准备工作中创建好的目录。
所有kafka节点都要按照上文中的节点规划进行配置,完成config/kraft/server.properties配置文件的修改。
生成一个唯一的集群ID(在一台kafka服务器上执行一次即可),这一个步骤是在安装kafka2.0版本的时候不存在的。
$ /opt/kafka_2.12-3.3.1/bin/kafka-storage.sh random-uuid
SzIhECn-QbCLzIuNxk1A2A
使用生成的集群ID+配置文件格式化存储目录log.dirs,
所以这一步确认配置及路径确实存在,
并且kafka用户有访问权限(检查准备工作是否做对)。
每一台主机服务器都要执行命令:
/opt/kafka_2.12-3.3.1/bin/kafka-storage.sh format \
-t SzIhECn-QbCLzIuNxk1A2A \
-c /opt/kafka_2.12-3.3.1/config/kraft/server.properties
格式化操作完成之后,log.dirs目录下多出一个Meta.properties文件,存储了当前的kafka节点的id(node.id),当前节点属于哪个集群(cluster.id)
[root@data-vm2 ~]# ll /data/kafka/
总用量 8
-rw-r--r--. 1 root root 249 10月 11 18:23 bootstrap.checkpoint
-rw-r--r--. 1 root root 86 10月 11 18:23 meta.properties
$ cat /data/kafka/meta.properties
#
#Tue Apr 12 07:39:07 CST 2022
node.id=1
version=1
cluster.id=SzIhECn-QbCLzIuNxk1A2A
zimug1 zimug2 zimug3是三台应用服务器的主机名称(参考上文中的角色规划),实现方式已经在本专栏《linux主机与ip解析》中进行了说明。将下面的命令集合保存为一个shell脚本,并赋予执行权限。执行该脚本即可启动kafka集群所有的节点,前提是:你已经按照本专栏的《集群各节点之间的ssh免密登录》安装方式做了集群各节点之间的ssh免密登录。
启动命令:
bin/kafka-server-start.sh \
/opt/kafka_2.12-3.3.1/config/kraft/server.properties
# 后台运行
nohup bin/kafka-server-start.sh \
/opt/kafka_2.12-3.3.1/config/kraft/server.properties 2>&1 &
脚本:
#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#启动所有的kafka
for kafka in $kafkaServers
do
ssh -T $kafka <<EOF
nohup /opt/kafka_2.12-3.3.1/bin/kafka-server-start.sh /opt/kafka_2.12-3.3.1/config/kraft/server.properties 1>/dev/null 2>&1 &
EOF
echo 从节点 $kafka 启动kafka3.0...[ done ]
sleep 5
done
一键停止kafka集群各节点的脚本,与启动脚本的使用方式及原理是一样的。
停止命令:
/opt/kafka_2.12-3.3.1/bin/kafka-server-stop.sh
执行脚本:
#!/bin/bash
kafkaServers='data-vm1 data-vm2 data-vm3'
#停止所有的kafka
for kafka in $kafkaServers
do
ssh -T $kafka <<EOF
cd /opt/kafka_2.12-3.3.1
bin/kafka-server-stop.sh
EOF
echo 从节点 $kafka 停止kafka...[ done ]
sleep 5
done
[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--create \
--topic quickstart-events \
--bootstrap-server data-vm4:9092
Created topic quickstart-events.
[root@data-vm1 kafka_2.12-3.3.1]#
#
[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--create \
--topic quickstart-events \
--bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092
bin/kafka-topics.sh \
--list \
--bootstrap-server data-vm4:9092
#
bin/kafka-topics.sh \
--list \
--bootstrap-server data-vm1:9092,data-vm2:9092,data-vm3:9092,data-vm4:9092
[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-topics.sh \
--describe \
--topic quickstart-events \
--bootstrap-server data-vm3:9092
Topic: quickstart-events TopicId: zSOJC6wNRRGQ4MudfHLGvQ PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1
[root@data-vm1 kafka_2.12-3.3.1]#
[root@data-vm1 kafka_2.12-3.3.1]# bin/kafka-console-producer.sh \
--topic quickstart-events \
--bootstrap-server data-vm1:9092
# 参考: 创建并配置topic
bin/kafka-topics.sh \
--bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 1 \
--replication-factor 1 \
--config max.message.bytes=64000 \
--config flush.messages=1
# ------------------------- 参考 ------------------------ #
# 1: 修改已创建topic配置
# (Overrides can also be changed or set later using the alter configs command.)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config max.message.bytes=128000
# 2: 检查已修改的topic配置是否生效
# (To check overrides set on the topic you can do)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--describe
# 3. 恢复到原来的配置
# (To remove an override you can do)
bin/kafka-configs.sh \
--bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--delete-config max.message.bytes
# 4. 增加分区数
# (To add partitions you can do)
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--alter \
--topic my_topic_name \
--partitions 40
# 5. 添加配置
# (To add configs:)
bin/kafka-configs.sh \
--bootstrap-server broker_host:port \
--entity-type topics \
--entity-name my_topic_name \
--alter \
--add-config x=y
# 6. 移除配置
# (To remove a config:)
bin/kafka-configs.sh \
--bootstrap-server broker_host:port \
--entity-type topics \
--entity-name my_topic_name \
--alter \
--delete-config x
# 7. 删除topic
# (And finally deleting a topic:)
bin/kafka-topics.sh \
--bootstrap-server broker_host:port \
--delete \
--topic my_topic_name
bin/kafka-console-consumer.sh \
--topic quickstart-events \
--from-beginning \
--bootstrap-server data-vm4:9092
# 检查消费者postition
# Checking consumer position
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
my-topic 0 2 4 2 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 1 2 3 1 consumer-1-029af89c-873c-4751-a720-cefd41a669d6 /127.0.0.1 consumer-1
my-topic 2 2 3 1 consumer-2-42c1abd4-e3b2-425d-a8bb-e1ea49b29bb2 /127.0.0.1 consumer-2
# list all consumer groups across all topics
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
test-consumer-group
# To view offsets, as mentioned earlier,
# we "describe" the consumer group like this:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group my-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
topic3 0 241019 395308 154289 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic2 1 520678 803288 282610 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic3 1 241018 398817 157799 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2
topic1 0 854144 855809 1665 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic2 0 460537 803290 342753 consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1
topic3 2 243655 398812 155157 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4
# 更多配置参考:
# https://kafka.apache.org/32/documentation.html#uses
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po