kafka的安装需要依赖于jdk,需要在服务器上提前安装好该环境,这里使用用jdk1.8。
官网地址:
较新的版本已自带Zookeeper,无需额外下载。这里使用3.2.0做演示。

注意要下载Binary downloads标签下的tgz包,Source download标签下的包为源码。无法直接运行,需要编译。
使用ssh连接工具将kafka_2.12-3.2.0.tgz这个包上传到云服务器上的一个目录。

打开命令行,进入到放有压缩包的目录,执行
tar -zxvf kafka_2.12-3.2.0.tgz
然后使用cd命令进入到/kafka_2.12-3.2.0/config/下,使用
vi server.properties
编辑配置文件。

删除listeners和advertised前方的#号,改成如下配置:
listeners=PLAINTEXT://云服务器内网ip:9092(本地访问用本地ip)
# 如果要提供外网访问则必须配置此项
advertised.listeners=PLAINTEXT://云服务器公网ip:9092(若要远程访问需配置此项为云服务器的公网ip)
# zookeeper连接地址,集群配置格式为ip:port,ip:port,ip:port
zookeeper.connect=云服务器公网ip:2181
在云服务器控制台内进入安全组页面,添加两条新的入站规则,tcp/9092和tcp/2181
先查看使用的防火墙类型iptables/firewalld
iptables操作命令
1.打开/关闭/重启防火墙
开启防火墙(重启后永久生效):chkconfig iptables on
关闭防火墙(重启后永久生效):chkconfig iptables off
开启防火墙(即时生效,重启后失效):service iptables start
关闭防火墙(即时生效,重启后失效):service iptables stop
重启防火墙:service iptables restartd
2.查看打开的端口
/etc/init.d/iptables status
3.开启端口
iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.保存并重启防火墙
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart
Centos7默认安装了firewalld,如果没有安装的话,可以使用 yum install firewalld firewalld-config进行安装。
操作指令如下:
1.启动防火墙
systemctl start firewalld
2.禁用防火墙
systemctl stop firewalld
3.设置开机启动
systemctl enable firewalld
4.停止并禁用开机启动
sytemctl disable firewalld
5.重启防火墙
firewall-cmd --reload
6.查看状态
systemctl status firewalld或者 firewall-cmd --state
7.在指定区域打开端口(记得重启防火墙)
firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)
打开tcp/9092和tcp/2181这两个端口后,重启防火墙,并查看开放的端口确实生效。
cd命令进入kafka_2.12-3.2.0目录下,执行
bin/zookeeper-server-start.sh config/zookeeper.properties
启动zookeeper,不加-daemon方便排除启动错误,新建一个shell窗口,进入该目录再执行
bin/kafka-server-start.sh config/server.properties
启动kafka,若打印日志未报错,若未出现error日志,说明启动成功。
查询kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因为kafka使用zookeeper作为配置中心,一些topic信息需要查询该kafka对应的zookeeper
创建topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
开启生产者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
开启消费者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test
在pom.xml文件中引入kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>
在application.yml配置文件中配置kafka
server:
port: 8080
spring:
kafka:
bootstrap-servers: 云服务器外网ip地址:9092
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
生产者
@RestController
public class KafkaController {
private final static String TOPIC_NAME = "test-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String send(@RequestParam("msg") String msg) {
kafkaTemplate.send(TOPIC_NAME, "key", msg);
return String.format("消息 %s 发送成功!", msg);
}
}
消费者
@Component
public class DemoConsumer {
/**
* @param record record
* @KafkaListener(groupId = "testGroup", topicPartitions = {
* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
* @TopicPartition(topic = "topic2", partitions = "0",
* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
* },concurrency = "6")
* //concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
*/
@KafkaListener(topics = "test-topic", groupId = "testGroup1")
public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup1 message: " + value);
System.out.println("testGroup1 record: " + record);
//手动提交offset,一般是提交一个banch,幂等性防止重复消息
// === 每条消费完确认性能不好!
ack.acknowledge();
}
//配置多个消费组
@KafkaListener(topics = "test--topic", groupId = "testGroup2")
public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println("testGroup2 message: " + value);
System.out.println("testGroup2 record: " + record);
//手动提交offset
ack.acknowledge();
}
}
使用swagger测试发送消息

控制台打印消息

我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po
我打算为ruby脚本创建一个安装程序,但我希望能够确保机器安装了RVM。有没有一种方法可以完全离线安装RVM并且不引人注目(通过不引人注目,就像创建一个可以做所有事情的脚本而不是要求用户向他们的bash_profile或bashrc添加一些东西)我不是要脚本本身,只是一个关于如何走这条路的快速指针(如果可能的话)。我们还研究了这个很有帮助的问题:RVM-isthereawayforsimpleofflineinstall?但有点误导,因为答案只向我们展示了如何离线在RVM中安装ruby。我们需要能够离线安装RVM本身,并查看脚本https://raw.github.com/wayn
我有一个奇怪的问题:我在rvm上安装了rubyonrails。一切正常,我可以创建项目。但是在我输入“railsnew”时重新启动后,我有“程序'rails'当前未安装。”。SystemUbuntu12.04ruby-v"1.9.3p194"gemlistactionmailer(3.2.5)actionpack(3.2.5)activemodel(3.2.5)activerecord(3.2.5)activeresource(3.2.5)activesupport(3.2.5)arel(3.0.2)builder(3.0.0)bundler(1.1.4)coffee-rails(
我刚刚为fedora安装了emacs。我想用emacs编写ruby。为ruby提供代码提示、代码完成类型功能所需的工具、扩展是什么? 最佳答案 ruby-mode已经包含在Emacs23之后的版本中。不过,它也可以通过ELPA获得。您可能感兴趣的其他一些事情是集成RVM、feature-mode(Cucumber)、rspec-mode、ruby-electric、inf-ruby、rinari(用于Rails)等。这是我当前用于Ruby开发的Emacs配置:https://github.com/citizen428/emacs
我正在尝试在我的centos服务器上安装therubyracer,但遇到了麻烦。$geminstalltherubyracerBuildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingtherubyracer:ERROR:Failedtobuildgemnativeextension./usr/local/rvm/rubies/ruby-1.9.3-p125/bin/rubyextconf.rbcheckingformain()in-lpthread...yescheckingforv8.h...no***e
我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search
我实际上是在尝试使用RVM在我的OSX10.7.5上更新ruby,并在输入以下命令后:rvminstallruby我得到了以下回复:Searchingforbinaryrubies,thismighttakesometime.Checkingrequirementsforosx.Installingrequirementsforosx.Updatingsystem.......Errorrunning'requirements_osx_brew_update_systemruby-2.0.0-p247',pleaseread/Users/username/.rvm/log/138121
由于fast-stemmer的问题,我很难安装我想要的任何rubygem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=