Linux 安装 Kafka 并开启 SASL/PLAINTEXT 认证
在之前的开发工作中,需要开发使用用户名密码的方式连接 Kafka 并对 Kafka 数据进行处理,但是客户并没有提供可以测试的环境,于是就自己着手搭建了一套单节点的 Kafka 并开启 SASL 认证。
| 组件 | 版本 |
|---|---|
| kafka | 2.11-2.22 |
| zookeeper | 3.6.2 |
# 将下载的 zookeeper 和 kafka 包上传到 /opt/software 目录下
mkdir -p /opt/software
# 组件的安装目录
mkdir -p /opt/module
# 组件数据存放目录
mkdir -p /opt/data
# 解压zookeeper文件到/opt/modele/目录下
cd /opt/software
tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -C /opt/module/
# 进入解压后的文件
cd /opt/module/apache-zookeeper-3.6.2-bin
# ll 可以发现文件夹结构如下
ll
drwxr-xr-x 2 root root 4096 Sep 4 2020 bin
drwxr-xr-x 2 root root 4096 Sep 4 2020 conf
drwxr-xr-x 5 root root 4096 Sep 4 2020 docs
drwxr-xr-x 2 root root 4096 Jul 6 11:27 lib
-rw-r--r-- 1 root root 11358 Sep 4 2020 LICENSE.txt
-rw-r--r-- 1 root root 432 Sep 4 2020 NOTICE.txt
-rw-r--r-- 1 root root 1963 Sep 4 2020 README.md
-rw-r--r-- 1 root root 3166 Sep 4 2020 README_packaging.md
# 创建zk配置文件
cd conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 修改如下配置(数据存储文件,自定义都行,不修改也行)
dataDir=/tmp/zookeeper => dataDir=/opt/data/zookeeper
# 记得创建刚才配置的地址
mkdir -p /opt/data/zookeeper
# ok 启动 zk
cd ../bin/
sh zkServer.sh start
# 执行启动命令后控制台打印如下日志
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.6.2-bin/bin/../conf/zoo.cfg
Starting zookeeper ... FAILED TO START
# ok 很明显启动失败了 我们查看日志
less ../logs/zookeeper-root-server-zujian1.sdns.bigdata.suxr.sit.testpbcdci.out
#可以看到如下的日志
Problem starting AdminServer on address 0.0.0.0, port 8080 and command URL /commands
Caused by: java.io.IOException: Failed to bind to /0.0.0.0:8080
Caused by: java.net.BindException: Address already in use
# 发现 8080端口已经被占用了,这是Zookeeper 3.5 之后的新特性,AdminServer 默认使用8080端口
# 修改 AdminServer 的默认端口
vim ../conf/zoo.cfg
# 添加如下配置
admin.serverPort=8081
# 重新启动zk
sh zkServer.sh start
# 看到如下日志表示启动成功
ZooKeeper JMX enabled by default
Using config: /opt/module/apache-zookeeper-3.6.2-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
# 其他命令
sh zkServer.sh restart # 重启
sh zkServer.sh stop # 停止
sh zkServer.sh status # 查看zk运行状态
# 解压 kafka 安装包
cd /opt/software/
tar -zxvf kafka_2.11-2.2.2.tgz -C /opt/module/
# 修改 kafka 配置文件
cd /opt/module/kafka_2.11-2.2.2/config
vim server.properties
# broker 服务器要监听的地址及端口
listeners=PLAINTEXT://hostname:9092
# kafka消息存放的路径
log.dirs=/opt/data/kafka/kafka-logs
# zk 地址
zookeeper.connect=localhost:2181/kafka
# 创建如上的文件夹
mkdir -p /opt/data/kafka/kafka-logs
# 启动 kafka
cd ../bin/
sh kafka-server-start.sh ../config/server.properties
# 查看日志,如果启动成功,则终止前台程序,改为守护进程启动
sh kafka-server-start.sh -daemon ../config/server.properties
# 查看启动日志
tail 200f ../logs/kafkaServer.out
# 创建 topic (这里需要使用主机名,使用IP会报错,我也不知道为什么)
./kafka-topics.sh --create --bootstrap-server hostname:9092 --replication-factor 1 --partitions 1 --topic topic-test
# 经过一番研究,上面的bug解决了,修改配置文件,详情查看博客(https://blog.csdn.net/chenfeng_sky/article/details/103124473)
vim ../config/server.properties
advertised.host.name=ip
# 查看 topic
./kafka-topics.sh --bootstrap-server hostname:9092 --list
# 查看 topic 详细信息
./kafka-topics.sh --describe --bootstrap-server hostname:9092 --topic topic-test
# 启动生产者
./kafka-console-producer.sh --broker-list hostname:9092 --topic topic-test
# 开一个linux新窗口 启动消费者
./kafka-console-consumer.sh --bootstrap-server hostname:9092 --from-beginning --topic topic-test
# 往生产者中输入消息,可以看到在消费者中会打印消息,到这里,kafka 安装完成
# 开启认证功能,注意是 阿拉伯数字的 1,而不是英文字母 L。
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
# 设置认证方式为sasl
requireClientAuthScheme=sasl
# 客户端开启 SASL
zookeeper.sasl.client=true
jaasLoginRenew=3600000
cd ../conf
vim zk_server_jaas.conf
# 添加如下内容
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="zookeeper-admin-pwd"
user_kafka="kafka-zookeeper-pwd";
};
Server字段用于指定服务端登录配置。通过org.apache.org.apache.kafka.common.security.plain.PlainLoginModule 由指定采用 PLAIN 机制。
定义了两个用户,username 和paasword 是 zk集群之间的认证密码,user_kafka = “kafka"定义了一个用户"kafka”,密码是"kafka-zookeeper-pwd", 通过“ user_ "为前缀后接用户名方式创建连接代理的用户名和密码。
从上面的配置可以看出,Zookeeper的认证机制是使用插件 “org.apache.kafka.common.security.plain.PlainLoginModule”,所以需要导入Kafka相关jar包,kafka-clients相关jar包,在kafka服务下的lib目录中可以找到,根据kafka不同版本,相关jar包版本会有所变化。
cp /opt/module/kafka_2.11-2.2.2/libs/kafka-clients-2.2.2.jar /opt/module/apache-zookeeper-3.6.2-bin/lib/
cp /opt/module/kafka_2.11-2.2.2/libs/lz4-java-1.5.0.jar /opt/module/apache-zookeeper-3.6.2-bin/lib/
cd /opt/module/apache-zookeeper-3.6.2-bin/bin/
vim zkEnv.sh
# 添加如下变量
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/module/apache-zookeeper-3.6.2-bin/conf/zk_server_jaas.conf"
/opt/module/apache-zookeeper-3.6.2-bin/bin/zkServer.sh stop
/opt/module/apache-zookeeper-3.6.2-bin/bin/zkServer.sh start
/opt/module/apache-zookeeper-3.6.2-bin/bin/zkServer.sh status
cd /opt/module/kafka_2.11-2.2.2/config/
vim kafka_server_jaas.conf
# 添加如下内容
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="kafka-admin-pwd"
user_admin="kafka-admin-pwd"
user_kafka_client="kafka-server-pwd";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-zookeeper-pwd";
};
KafkaServer,使用 user_ 来定义多个用户,供客户端程序(生产者、消费者程序)认证使用,可以定义多个,后续配置还可以根据不同的用户定义ACL。username 和password 的值必须在 user_*中有配置,且用户名密码一致,否则kafka启动就会报错。
Client 配置 kafka 和 zookeeper 通信用的,从上文的Zookeeper JAAS文件中选择一个用户,填写用户名和密码即可。
vim ../bin/kafka-run-class.sh
# 添加如下内容
export KAFKA_OPTS=" -Djava.security.auth.login.config=/opt/module/kafka_2.11-2.2.2/config/kafka_server_jaas.conf"
listeners=SASL_PLAINTEXT://10.98.0.116:9092
advertised.listeners=SASL_PLAINTEXT://10.98.0.116:9092
# 用于在代理之间进行通信的安全协议。有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# super.users=User:*的值和kafka_server_jaas.conf中KafkaServer的username的值保持一致
super.users=User:admin
# 配置 kafka 使用 zookeeper 的 ACL
zookeeper.set.acl=true
../bin/kafka-server-stop.sh
../bin/kafka-server-start.sh -daemon ../config/server.properties
../bin/kafka-topics.sh --zookeeper hostname:2181 --list
../bin/kafka-topics.sh --bootstrap-server hostname:9092 --list
此时我们会发现,使用 --bootstrap-server 命令卡住不动了,我们可以查看一下日志
tail -200f /opt/module/kafka_2.11-2.2.2/logs/server.log
# 可以看到一直在打印异常信息
[2022-07-06 19:16:07,363] INFO [SocketServer brokerId=0] Failed authentication with /hostname (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)
# 可以发现,SASL握手失败,brokerId=0 的 broker 认证失败了,这说明 SASL 认证开启成功了,我们和 broker 通信需要经过认证,而访问 zk 时,kafka 会将认证信息携带过去。所以我们在访问 broker 的时候也将认证信息传递过去可以了
vim /opt/module/kafka_2.11-2.2.2/config/sasl.properties
# 添加如下认证信息,需要注意的是,username 和 password 就是之前 KafkaServer 中配置好的用户名密码。在这里先使用超级用户,因为别的用户都涉及到 ACL 授权的事,这个后面再说,超级用户不受ACL权限控制。
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-admin-pwd";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
# 携带认证信息查看 Topic
./kafka-topics.sh --bootstrap-server hostname:9092 --list --command-config ../config/sasl.properties
# 启动生产者
./kafka-console-producer.sh --broker-list hostname:9092 --topic topic-test -producer.config ../config/sasl.properties
# 开一个linux新窗口 启动消费者
./kafka-console-consumer.sh --bootstrap-server hostname:9092 --from-beginning --topic topic-test -consumer.config ../config/sasl.properties
在生产中我们使用生产者和消费者,大都使用下面的方式进行配置。
# 在 kafka_server_jaas.conf 中添加 KafkaClient 的认证信息
vim ../config/kafka_server_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="kafka-admin-pwd";
};
# 修改 consumer.properties 和 producer.properties 配置文件,添加如下内容
cat > ../config/consumer.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF
cat > ../config/producer.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
EOF
配置好了之后,启动命令如下
# 启动生产者
./kafka-console-producer.sh --broker-list hostname:9092 --topic topic-test -producer.config ../config/producer.properties
# 开一个linux新窗口 启动消费者
./kafka-console-consumer.sh --bootstrap-server hostname:9092 --from-beginning --topic topic-test -consumer.config ../config/consumer.properties
后续还有 ACL 相关问题,我自己也还没有搞懂。。。
又稍微的研究了一下zk和kafka的ACL,有点懂了,下面就带大家来进行配置。
在我们开始之前,我们可以先阅读一下下面的内容
首先我们来看一下官网的这一段话
If you are running a version of Kafka that does not support security or simply with security disabled, and you want to make the cluster secure, then you need to execute the following steps to enable ZooKeeper authentication with minimal disruption to your operations:
- Perform a rolling restart setting the JAAS login file, which enables brokers to authenticate. At the end of the rolling restart, brokers are able to manipulate znodes with strict ACLs, but they will not create znodes with those ACLs
- Perform a second rolling restart of brokers, this time setting the configuration parameter zookeeper.set.acl to true, which enables the use of secure ACLs when creating znodes
- Execute the ZkSecurityMigrator tool. To execute the tool, there is this script: ./bin/zookeeper-security-migration.sh with zookeeper.acl set to secure. This tool traverses the corresponding sub-trees changing the ACLs of the znodes
所以,我们要开启 Zookeeper 的 ACL 身份认证功能,执行一下的步骤即可
1、配置 jaas 文件,使得 kafka broker 需要认证操作 zk 中的节点。这个我们在之前开启 SASL 已经配置完成了。
2、在 kafka 配置文件(server.properties)中添加如下的配置
# 配置 kafka 使用 zookeeper 的 ACL
zookeeper.set.acl=true
3、执行如下脚本,将 zookeeper.acl 设置为 secure。此工具会遍历相应的子节点,更改 znode 的 ACL。
./zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181/kafka
在执行前,我们可以通过 zk 客户端来查看一下 kafka 节点的 ACL
/opt/module/apache-zookeeper-3.6.2-bin/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 3] getAcl /kafka
'world,'anyone
: cdrwa
[zk: localhost:2181(CONNECTED) 4]
# 可以看到是默认的权限,然后我们执行上述脚本,再次查看发现节点ACL已经发生改变,表示经过 sasl 认证的kafka 用户具有 cdrwa 权限,其他所有用户都只有 r 权限。
[zk: localhost:2181(CONNECTED) 5] getAcl /kafka
'sasl,'kafka
: cdrwa
'world,'anyone
: r
[zk: localhost:2181(CONNECTED) 6]
经过我的测试,我发现如果在 zookeeper.set.acl=true 配置完成后,zk 中没有 kafka 的节点信息,那么启动 kafka 后,zk 中生成的节点信息,自动就修改了 ACL。
在开始之前,我们简单学习下Kafka ACL的格式。根据官网的介绍,Kafka中一条ACL的格式如下:“Principal P is [Allowed/Denied] Operation O From Host H On Resource R”。它们的含义如下:
ok,在之前的配置中,我们配置 kafkaClient 都是使用的超级用户,跳过了 ACL,现在我们将用户修改回来。
vim /opt/module/kafka_2.11-2.2.2/config/sasl.properties
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka_client" password="kafka-server-pwd";
vim ../config/kafka_server_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka_client"
password="kafka-server-pwd";
};
配置好了之后,我们重启服务,再次启动生产者和消费者,此时我们会发现,我们当前没有权限操作 topic 了
[root@hostname bin]# ./kafka-console-consumer.sh --bootstrap-server hostname:9092 --topic topic-test -consumer.config ../config/consumer.properties
[2022-07-07 12:16:27,284] WARN [Consumer clientId=consumer-1, groupId=test-consumer-group] Error while fetching metadata with correlation id 2 : {topic-test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2022-07-07 12:16:27,285] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [topic-test]
Processed a total of 0 messages
[root@hostname bin]#
我们可以通过,如下命令来查看 topic 的权限信息
# 查看所有权限
./kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181/kafka --list
# 查看单个
./kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181/kafka --list --topic topic-test
然后可以通过如下命令来对topic进行acl
# 给用户kafka_client topic topic-test 的 producer 权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181/kafka --topic "topic-test" --add --allow-principal User:kafka_client --producer
# 给用户kafka_client 所有消费者组的 topic topic-test 的 consumer 权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181/kafka --topic "topic-test" --add --allow-principal User:kafka_client --consumer --group "*"
配置好了之后,kafka_client 用户对 topic-test topic 就有了正常的生产消费权限,如下
[root@hostname bin]# ./kafka-acls.sh --authorizer-properties zookeeper.connect=hostname:2181/kafka --list
Current ACLs for resource `Topic:LITERAL:topic-test`:
User:kafka_client has Allow permission for operations: Write from hosts: *
User:kafka_client has Allow permission for operations: Describe from hosts: *
User:kafka_client has Allow permission for operations: Create from hosts: *
User:kafka_client has Allow permission for operations: Read from hosts: *
ACL 还有很多中限制,就看大家的实际需求去配置就行。
我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file
简单代码require'net/http'url=URI.parse('getjson/otherdatahere[link]')req=Net::HTTP::Get.new(url.to_s)res=Net::HTTP.start(url.host,url.port){|http|http.request(req)}putsres.body只是想知道如何在phpcURL中放置身份验证token,我是这样做的 curl_setopt($ch,CURLOPT_HTTPHEADER,array('Authorization:Bearerxxx'));//Bearertokenfora
谁能提供一个使用HTTParty和digestauth的例子?我在网上找不到例子,希望有人能提供一些帮助。谢谢。 最佳答案 您可以在定义类时使用digest_auth方法设置用户名和密码classFooincludeHTTPartydigest_auth'username','password'end 关于ruby-HTTParty摘要认证,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questi
联通家庭宽带开启ipv6废话不多,直接开干首先登录联通光猫的后台,机身有写我的是http://192.168.1.1/CU.html广州家庭宽带账号密码CUAdmincuadmin00259e这里好像是默认设置,不大记得了。保存就好然后登陆路由器后台,我的是tp-link选桥模式这里的桥模式跟光猫的wan类型应该是互斥关系,光猫设置桥接,路由器就要设pppoe拨号,我没试过。然后在系统就可以查看了或者直接访问这个网址http://testipv6.com/注意1:如果你是便宜小米红米之类的路由器,他好像是默认不打开这个ipv6防火墙的,也就是你可能在系统上看到有ipv6地址,但实际上是不互通的
华为认证分等级的,相当于初中高三个等级,当然高级是比较难考的,也是含金量最高的。我就慢慢给你介绍一下。1.了解华为认证华为认证网络工程师是由华为公司认证与采购部推出的独立认证体系,与之前的华为认证不同,简称HCIA。同时华为认证是华为技术有限公司凭借多年信息通信技术人才培养经验,以及对行业发展的理解,以层次化的职业技术认证为指引,推出的覆盖IP、IT、CT以及ICT融合技术领域的认证体系,是ICT全技术领域认证体系。2.怎么考取华为认证网络工程师?要考取华为认证网络工程师必须选择最近的Prometric授权考试中心APTC报名并参加GB0-190的考试,考试通过后,以获得由华为统一签发的“华
一、解决痛点使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件二、组件能力1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix为auto,因此只有auto前缀的topic,才会被组件动态监听。)2、应用输出日志,监听到新增auto.topic1,并初始化客户端(主题刷新间隔为10s)3、发新的消
目前6月DAMA-CDGP数据治理认证考试开放报名地区有:北京、上海、广州、深圳、长沙、呼和浩特。目前南京、济南、西安、杭州等地区还在接近开考人数中,打算参加6月考试的朋友们可以抓紧时间报名啦!!!5月初,DAMA-CDGA/CDGP数据治理认证考前班也即将开班啦!报名从速!!!DAMA认证为数据管理专业人士提供职业目标晋升规划,彰显了职业发展里程碑及发展阶梯定义,帮助数据管理从业人士获得企业数字化转型战略下的必备职业能力,促进开展工作实践应用及实际问题解决,形成企业所需的新数字经济下的核心职业竞争能力。DAMA是数据管理方面的认证,帮助数据从业者提升数据管理能力。CDGP数据治理专家认证属于
我只是想比较在Rails中实现ACL时使用的不同解决方案。 最佳答案 我使用授权插件(由BillKatz创建):Rolescanbeauthorizedfortheentireapplication,amodelclass,oraspecificobject.Thepluginprovidesawayofcheckingauthorizationattheclassorinstancemethodlevelusingpermitandpermit?methods.Italsoprovidesenglish-likedynamicme
我正在尝试通过mongoid.yml进行身份验证,但我想要进行身份验证的用户在admin数据库中。如果我尝试在数据库字段中指定管理数据库,它也会将所有集合放在那里,我不希望这样。有没有办法将我的数据库字段设置为我想要的数据库,但指定一个单独的身份验证数据库?这是我的mongoid.yml文件development:sessions:default:database:XC_DEVhosts:-IP:PORTusername:userpassword:password 最佳答案 这个问题已经存在一年多了,但值得回答。是的,auth_so
在我的Rails应用中,我在创建时将客户RMA运输标签保存到S3存储桶中。我刚刚更新到aws-sdkgem的V2,现在我设置ACL的代码不起作用。在V1.X中有效的代码:#SaveslabeltoS3buckets3=AWS::S3.newobj=s3.buckets[ENV['S3_BUCKET_NAME']].objects["#{shippinglabel_filename}"]obj.write(open(label.label('pdf').postage_label.label_pdf_url,'rb'),:acl=>:public_read).write似乎已被弃用,所以