草庐IT

Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置

Bruin_W 2023-11-02 原文

出发点

公司Kafka一直没做安全验证,由于是诱捕程序故需要面向外网连接,需要增加Kafka连接验证,保证Kafka不被非法连接,故开始研究Kafka安全验证
使用Kafka版本为2.4.0版本,主要参考官方文档

官网

官网对2.4版本安全验证介绍以及使用方式地址:
https://kafka.apache.org/24/documentation.html#security

具体流程

使用 SASL/PLAIN 进行身份验证
SASL/PLAIN 是一种简单的用户名/密码身份验证机制,通常与 TLS 一起使用以进行加密以实现安全身份验证。 Kafka 支持 SASL/PLAIN 的默认实现,可以扩展用于生产用途,如此处所述。

用户名用作 ACL 等配置的身份验证。
配置 Kafka 代理
将一个经过适当修改的 JAAS 文件添加到每个 Kafka 代理的配置目录中,类似于下面的文件,在这个例子中我们称之为 kafka_server_jaas.conf: 此配置定义了两个用户(admin 和 alice)。代理使用 KafkaServer 部分中的属性用户名和密码来启动与其他代理的连接。在此示例中,admin 是代理间通信的用户。属性集user_用户名定义 连接到代理的所有用户的密码,代理验证所有客户端连接,包括 来自使用这些属性的其他经纪人的人。

重要配置kafka_server_jaas.conf

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};

将 JAAS 配置文件位置作为 JVM 参数传递给每个 Kafka 代理:

注意

以下配置需要添加到Kafka启动脚本中以添加JVM虚拟机运行参数

需要改为自己的kafka_server_jaas.conf配置文件路径

-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf

我自己的Kafka的路径为/opt/kafka/bin/kafka-server-start.sh
具体内容如下
主要配置为KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"

#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
        echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
        exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

在服务器属性中配置 SASL 端口和 SASL 机制,如此处所述。例如:

listeners=SASL_SSL://host.name:port
    security.inter.broker.protocol=SASL_SSL
    sasl.mechanism.inter.broker.protocol=PLAIN
    sasl.enabled.mechanisms=PLAIN

配置 Kafka 客户端

要在客户端上配置 SASL 身份验证,请执行以下操作:
在 producer.properties 或 consumer.properties 中为每个客户机配置 JAAS 配置属性。 登录模块描述了生产者和消费者等客户端如何连接到 Kafka 代理。 以下是 PLAIN 机制的客户端配置示例:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

客户端使用选项用户名和密码进行配置 客户端连接的用户。在此示例中,客户端以用户 alice 身份连接到代理。 JVM 中的不同客户机可以通过指定不同的用户名作为不同的用户进行连接 和 中的密码。sasl.jaas.config

客户机的 JAAS 配置也可以指定为类似于代理的 JVM 参数 如此处所述。客户端使用名为 KafkaClient 的登录部分。此选项只允许一个用户访问来自 JVM 的所有客户机连接。

在生产者属性或消费者属性中配置以下属性:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
在生产中使用 SASL/PLAIN
SASL/PLAIN 应仅与 SSL 一起使用作为传输层,以确保明文密码不会在未加密的情况下在线传输。
Kafka 中 SASL/PLAIN 的默认实现指定 JAAS 配置文件中的用户名和密码,如下所示。从 Kafka 2.0 版本开始,您可以避免在磁盘上存储明文密码 通过配置您自己的回调处理程序,这些处理程序使用配置选项和 从外部源获取用户名和密码。sasl.server.callback.handler.classsasl.client.callback.handler.class
在生产系统中,外部身份验证服务器可以实现密码身份验证。从卡夫卡2.0版本开始, 您可以通过配置 来插入自己的回调处理程序,这些处理程序使用外部身份验证服务器进行密码验证。sasl.server.callback.handler.class

使用Java客户端进行连接的配置类:

package com.xxx.xxx.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.Map;

/**
 * @FileName: KafkaSecurityConfig.java
 * @Description: KafkaSecurityConfig.java类说明
 * @Date: 2023/2/1 17:16
 */
@Configuration
@EnableKafka
public class KafkaSecurityConfig {
    public class KafkaProducerConfig {
        @Autowired
        private KafkaProperties kafkaProperties;

        /**
         * 消费者配置
         */
        @Bean
        public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            Map<String, Object> props = kafkaProperties.buildConsumerProperties();
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice-secret;");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
            factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
            factory.setConcurrency(2);
            factory.getContainerProperties().setPollTimeout(1500);
            return factory;
        }


        /**
         * 生产者配置
         */
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            Map<String, Object> props = kafkaProperties.buildProducerProperties();
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=alice password=alice-secret;");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
            return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
        }
    }
}

有关Kafka增加安全验证安全认证,SASL认证,并通过spring boot-Java客户端连接配置的更多相关文章

  1. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  2. ruby - 具有身份验证的私有(private) Ruby Gem 服务器 - 2

    我想安装一个带有一些身份验证的私有(private)Rubygem服务器。我希望能够使用公共(public)Ubuntu服务器托管内部gem。我读到了http://docs.rubygems.org/read/chapter/18.但是那个没有身份验证-如我所见。然后我读到了https://github.com/cwninja/geminabox.但是当我使用基本身份验证(他们在他们的Wiki中有)时,它会提示从我的服务器获取源。所以。如何制作带有身份验证的私有(private)Rubygem服务器?这是不可能的吗?谢谢。编辑:Geminabox问题。我尝试“捆绑”以安装新的gem..

  3. ruby-on-rails - 如果为空或不验证数值,则使属性默认为 0 - 2

    我希望我的UserPrice模型的属性在它们为空或不验证数值时默认为0。这些属性是tax_rate、shipping_cost和price。classCreateUserPrices8,:scale=>2t.decimal:tax_rate,:precision=>8,:scale=>2t.decimal:shipping_cost,:precision=>8,:scale=>2endendend起初,我将所有3列的:default=>0放在表格中,但我不想要这样,因为它已经填充了字段,我想使用占位符。这是我的UserPrice模型:classUserPrice回答before_val

  4. ruby-on-rails - 如何验证非模型(甚至非对象)字段 - 2

    我有一个表单,其中有很多字段取自数组(而不是模型或对象)。我如何验证这些字段的存在?solve_problem_pathdo|f|%>... 最佳答案 创建一个简单的类来包装请求参数并使用ActiveModel::Validations。#definedsomewhere,atthesimplest:require'ostruct'classSolvetrue#youcouldevencheckthesolutionwithavalidatorvalidatedoerrors.add(:base,"WRONG!!!")unlesss

  5. ruby - 检查数组是否在增加 - 2

    这个问题在这里已经有了答案:Checktoseeifanarrayisalreadysorted?(8个答案)关闭9年前。我只是想知道是否有办法检查数组是否在增加?这是我的解决方案,但我正在寻找更漂亮的方法:n=-1@arr.flatten.each{|e|returnfalseife

  6. ruby - 如何使用 Ruby aws/s3 Gem 生成安全 URL 以从 s3 下载文件 - 2

    我正在编写一个小脚本来定位aws存储桶中的特定文件,并创建一个临时验证的url以发送给同事。(理想情况下,这将创建类似于在控制台上右键单击存储桶中的文件并复制链接地址的结果)。我研究过回形针,它似乎不符合这个标准,但我可能只是不知道它的全部功能。我尝试了以下方法:defauthenticated_url(file_name,bucket)AWS::S3::S3Object.url_for(file_name,bucket,:secure=>true,:expires=>20*60)end产生这种类型的结果:...-1.amazonaws.com/file_path/file.zip.A

  7. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  8. ruby-on-rails - 如何将验证与模型分开 - 2

    我有一些非常大的模型,我必须将它们迁移到最新版本的Rails。这些模型有相当多的验证(User有大约50个验证)。是否可以将所有这些验证移动到另一个文件中?说app/models/validations/user_validations.rb。如果可以,有人可以提供示例吗? 最佳答案 您可以为此使用关注点:#app/models/validations/user_validations.rbrequire'active_support/concern'moduleUserValidationsextendActiveSupport:

  9. ruby-on-rails - 跳过状态机方法的所有验证 - 2

    当我的预订模型通过rake任务在状态机上转换时,我试图找出如何跳过对ActiveRecord对象的特定实例的验证。我想在reservation.close时跳过所有验证!叫做。希望调用reservation.close!(:validate=>false)之类的东西。仅供引用,我们正在使用https://github.com/pluginaweek/state_machine用于状态机。这是我的预订模型的示例。classReservation["requested","negotiating","approved"])}state_machine:initial=>'requested

  10. ruby - 如何在 Rails 4 中使用表单对象之前的验证回调? - 2

    我有一个服务模型/表及其注册表。在表单中,我几乎拥有服务的所有字段,但我想在验证服务对象之前自动设置其中一些值。示例:--服务Controller#创建Action:defcreate@service=Service.new@service_form=ServiceFormObject.new(@service)@service_form.validate(params[:service_form_object])and@service_form.saverespond_with(@service_form,location:admin_services_path)end在验证@ser

随机推荐