| 名称 | 解释 |
|---|---|
| broker | 消息中间件处理节点,一个broker就是一个kafka节点,多个broker构成一个kafka集群。 |
| topic | kafka根据消息进行分类,发布到kafka的每个消息都有一个对应的topic |
| producer | 消息生产(发布)者 |
| consumer | 消息消费(订阅)者 |
| consumergroup | 消息订阅集群,一个消息可以被多个consumergroup消费,但是一个consumergroup只有一个consumer可以消费消息。 |
当前环境:centos7.9三台
软件版本:kafka_2.13-3.0.0
环境目录:/usr/local/kafka
下载kafka;包含了zookeeper(三台机器都要操作)
[root@localhost opt]# wget https://archive.apache.org/dist/kafka/3.0.0/kafka_2.13-3.0.0.tgz
[root@localhost opt]# tar zxvf kafka_2.13-3.0.0.tgz
[root@localhost opt]# mv kafka_2.13-3.0.0 /usr/local/kafka
配置环境变量(三台机器都要操作)
[root@localhost opt]# vim /etc/profile
## 末尾添加
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$PATH:$ZOOKEEPER_HOME/bin
## 加载环境变量
[root@localhost opt]# source /etc/profile
由于Kafka分区位置和主题配置之类的元数据都存储在ZooKeeper群集中,所以要搭建kafka集群环境必须先安装zookeeper群集
zookeeper配置文件修改(三台配置一样)
[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# vim config/zookeeper.properties
dataDir=/tmp/zookeeper ## 主要用来配置zookeeper server数据的存放路径
clientPort=2181 ## zookeeper服务端口
tickTime=2000 ## zookeeper客户端与服务器之间的心跳时间。默认值为2000毫秒,即2秒
initLimit=10 ## Follower连接到Leader并同步数据的最大时间
syncLimit=5 ## Follower同步Leader的最大时间
maxClientCnxns=0 ## 客户端最大连接数,设置0或者不设置表示取消连接数限制
admin.enableServer=false ## 禁用 Admin Server
server.0=192.168.1.13:2888:3888 ## 配置zookeeper群集
server.1=192.168.1.108:2888:3888
server.2=192.168.1.143:2888:3888
创建myid文件(三台机器都要操作)
192.168.1.13机器上
[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "0" > /tmp/zookeeper/myid
192.168.1.108机器上
[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "1" > /tmp/zookeeper/myid
192.168.1.143机器上
[root@localhost kafka]# mkdir -p /opt/zookeeper;echo "2" > /tmp/zookeeper/myid
创建system启动文件
[root@localhost kafka]# vim /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=zookeeper
After=network.target
Wants=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin"
User=root
Group=root
ExecStart=/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties
Restart=always
[Install]
WantedBy=multi-user.target
启动zookeeper
[root@localhost kafka]# systemctl enable zookeeper.service
[root@localhost kafka]# systemctl start zookeeper.service
kafka配置文件修改
[root@localhost ~]# cd /usr/local/kafka/
[root@localhost kafka]# vim config/server.properties (直接复制到另外两台)
broker.id=1 ## 第一台1,第二台为2 第三台为3
listeners=PLAINTEXT://:9092 ## 监听IP地址和端口
advertised.listeners=PLAINTEXT://192.168.1.13:9092 ## 配置kafka的broker ip
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=404857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.13:2181,192.168.1.108:2181,192.168.1.143:2181 ##连接zookeeper
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
## 修改192.168.1.108的配置
broker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.108:9092
修改192.168.1.143的配置
broker.id=3
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.1.143:9092
## 修改JVM参数 (根据需求修改)
kafka_heap_opts:指定堆大小,默认是1GB
[root@localhost kafka]# vim bin/kafka-server-start.sh
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2G -Xms2G -XX:MaxDirectMemorySize=1G" ## 修改这一行即可
fi
创建system启动文件
[root@localhost kafka]# vim /usr/lib/systemd/system/kafka.service
[Unit]
Description=kafka
After=network.target
Wants=network.target
[Service]
Type=simple
Environment="PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/app/idk/bin"
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
Restart=always
[Install]
WantedBy=multi-user.target
启动zookeeper
[root@localhost kafka]# systemctl enable kafka.service
[root@localhost kafka]# systemctl start kafka.service
/usr/local/kafka/bin/kafka-run-class.sh: line 309: exec: java: not found
第一种: 修改配置文件中的Java环境
## 修改kafka-run-class.sh 配置文件
[root@localhost kafka]# vim bin/kafka-run-class.sh
## 找到如下:
# Which java to use
if [ -z "$JAVA_HOME" ]; then
JAVA="/usr/local/jdk/bin/java" ## 修改到绝对路径
else
JAVA="$JAVA_HOME/bin/java"
fi
第二种: 做个软连接
[root@localhost kafka]# echo $PATH
/usr/local/bin:/usr/local/sbin:/usr/bin:/usr/sbin:/bin:/sbin:/home/laserx/.local/bin:/home/laserx/bin
[root@localhost kafka]# ln -s /usr/local/jdk1.8.0_251/bin/java /usr/local/bin/java
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我是Google云的新手,我正在尝试对其进行首次部署。我的第一个部署是RubyonRails项目。我基本上是在关注thisguideinthegoogleclouddocumentation.唯一的区别是我使用的是我自己的项目,而不是他们提供的“helloworld”项目。这是我的app.yaml文件runtime:customvm:trueentrypoint:bundleexecrackup-p8080-Eproductionconfig.ruresources:cpu:0.5memory_gb:1.3disk_size_gb:10当我转到我的项目目录并运行gcloudprevie
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我可以在Azure网站上部署RubyonRails吗? 最佳答案 还没有。目前仅支持.NET和PHP。 关于ruby-on-rails-RubyonRails可以部署在Azure网站上吗?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/12964010/
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------
前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal