草庐IT

Kafka参数

小大宇 2023-09-03 原文

@KafkaListener注解

@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
    public void listen(String msgData) {
    LOGGER.info("收到消息" + msgData);
}  

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen3(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
    public void listen4(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

(1)id:默认是每个Listener实例的重要标识。

        对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。

(2)goupId:每个消费者所属的组。

        每个消费者都有自己所属的组。一个组中可以有多个消费者。

        一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。

[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA

[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA

[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA

[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA

 (3)clientIdPrefix:消费者clientId前缀

@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
                                 topics = Constants.TOPIC)
    public void listen2(String msgData) {
    LOGGER.info("收到消息" + msgData);
}

        如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。我这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。

 

        (4) autoStartup

public @interface KafkaListener ...
    /**
	 * Set to true or false, to override the default setting in the container factory. May
	 * be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
	 * a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
	 * obtain the value.
	 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
	 * @return true to auto start, false to not auto start.
	 * @since 2.2
	 */
	String autoStartup() default "";

        是否自动启动。如果是 false的话,那么默认不生效。需要手动唤醒。

        看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。

        另外可以使用 ${} 占位符的形式,支持配置。

application.yaml:
listener:
  auto:
    startup: true  

java :
    @KafkaListener(... containerFactory = "batchContainerFactory",
      autoStartup = "${listener.auto.startup}")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...

        手动唤醒实例代码如下:

注:每个消费者实例对象内部持有两个属性。

        boolean  running

        boolean  paused

有几个改变状态的方法:

调用start()方法后,running转为true

调用stop()方法后,running转为false

调用pause()方法后,paused转为true

调用resume()方法后,paused转为false    

只有running=true 、 paused=false 的消费者实例才能正常消费数据。

注解上的autoStartup改变的是running属性。

    @KafkaListener(id = "11111", groupId = "demo-group", 
                    topics = Constants.TOPIC, autoStartup = "false")
    public void listen(String msgData) throws InterruptedException {
        LOGGER.info("收到消息" + msgData);
        Thread.sleep(1000);
    }
唤醒消费者实例 示例代码

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    //获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");

    //如果是非运行状态
    if(!listenerContainer.isRunning()){
        //开启消费者
        listenerContainer.start();
    }

    listenerContainer.resume();
暂停消费者实例 示例代码

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    //获取到id="11111" 的消费实例对象
    MessageListenerContainer listenerContainer = 
                        this.registry.getListenerContainer("11111");

    listenerContainer.pause();  //paused ==> true
  //listenerContainer.stop(); //running==> false

         玩的花:定时任务自动启动

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    //定时器,每天凌晨0点开启监听
    @Scheduled(cron = "0 0 0 * * ?")
    public void startListener() {
        log.info("开启监听");
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer("11111").isRunning()) {
            registry.getListenerContainer("11111").start();
        }
        registry.getListenerContainer("11111").resume();
    }

    //定时器,每天早上10点关闭监听
    @Scheduled(cron = "0 0 10 * * ?")
    public void shutDownListener() {
        log.info("关闭监听");
        registry.getListenerContainer("11111").pause();
    }

@KafkaListener注解方法参数汇总

        @KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,那就要看你的kafka配置了。

    @KafkaListener(....)
    public void listen1(String data) 

    @KafkaListener(....)
    public void listen2(ConsumerRecord<K,V> data) 

    @KafkaListener(....)
    public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) 

    @KafkaListener(....)
    public void listen4(ConsumerRecord<K,V> data,
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

    @KafkaListener(....)
    public void listen5(List<String> data) 

    @KafkaListener(....)
    public void listen6(List<ConsumerRecord<K,V>> data) 

    @KafkaListener(....)
    public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) 

    @KafkaListener(....)
    public void listen8(List<ConsumerRecord<K,V>> data, 
                        Acknowledgment acknowledgment, Consumer<K,V> consumer)

 KafkaListenerContainerFactory配置

        我们在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。

        (1)所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。

    @Autowired
    private KafkaProperties properties;

        (2)@KakfkaListener注解标记的监听实例对象,如不特殊指定,则默认使用我们在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。

           为什么监听器实例对象会自动绑定到我们的配置文件呢?

           因为它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。

           请看作者给我们留的注释。如果不特殊指定,则默认的容器工厂将会被使用。

package org.springframework.kafka.annotation;

public @interface KafkaListener ...
	/**
	 * The bean name of the {@link 
            org.springframework.kafka.config.KafkaListenerContainerFactory}

	 * to use to create the message listener container 
               responsible to serve this endpoint.
	 * <p>If not specified, the default container factory is used, if any.
	 * @return the container factory bean name.
	 */
	String containerFactory() default "";

        默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。代码虽然长,但是很关键。

        这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。

        因此可以得出结论:

        如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。

package org.springframework.boot.autoconfigure.kafka;

class KafkaAnnotationDrivenConfiguration {

	@Bean
	@ConditionalOnMissingBean
	ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
		ConcurrentKafkaListenerContainerFactoryConfigurer configurer = 
                            new ConcurrentKafkaListenerContainerFactoryConfigurer();
		configurer.setKafkaProperties(this.properties);
		MessageConverter messageConverterToUse = 
                     (this.properties.getListener().getType().equals(Type.BATCH))
				                ? this.batchMessageConverter : this.messageConverter;
		configurer.setMessageConverter(messageConverterToUse);
		configurer.setReplyTemplate(this.kafkaTemplate);
		configurer.setTransactionManager(this.transactionManager);
		configurer.setRebalanceListener(this.rebalanceListener);
		configurer.setErrorHandler(this.errorHandler);
		configurer.setBatchErrorHandler(this.batchErrorHandler);
		configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
		configurer.setRecordInterceptor(this.recordInterceptor);
		return configurer;
	}

	@Bean
	@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
	ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = 
                                    new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory, kafkaConsumerFactory
				.getIfAvailable(() -> 
           new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
		return factory;
	}

        自定义容器工厂实例代码示例:

    @Autowired
    private KafkaProperties properties;
    
    @Bean("batchContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
        ConcurrentKafkaListenerContainerFactory<?, ?> container =
                new ConcurrentKafkaListenerContainerFactory<>();

        Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
        stringObjectMap.put("enable.auto.commit", false);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
        //没有topic是否禁止系统启动
        container.setMissingTopicsFatal(true);
        //并发
        container.setConcurrency(1);
        //批量接收
        container.setBatchListener(true);
        //如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
        container.getContainerProperties().setPollTimeout(5000);
        //设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        //设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
        //factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
        return container;
    }


    @KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
    public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
        LOGGER.info("4444收到消息" + list.size());
        acknowledgment.acknowledge();
    }

吞吐量

        如下,这里我只列出了影响本例的几条参数。

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      # max-poll-records: 20

    listener:
      ack-mode: batch
      type: batch
      concurrency: 5

        如果我设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。

    @KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
    public void listen4(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }

    @KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
    public void listen5(List<String> msgData) {
        LOGGER.info("收到消息" + msgData);
    }

    @Bean
    public NewTopic newTopic() {
        return new NewTopic(Constants.TOPIC, 8, (short) 1);
    }

        系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。

        另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。

        从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!

[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]

        结论:

        (1)concurrency设计的是多少,每个使用@KafkaListener的消费者实例就会创建多少个线程。当然了,最后创建的线程的线程可能没有分配到分区,所以就会一直闲置到系统中。

        (2)设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。

后记

        这一顿分析下来,总算把我这些天的疑惑给解决了。之前只会简单使用Kafka,但是对这些参数与@KafkaListener注解的理解,还模糊不清。现在终于茅塞顿开了。

        上述结论都是我加以实践与思考的得出的结论,对于Kafka小白来说或许有一定的参考价值。作为开发人员,希望自己未来在面对Kafka线上问题的时候,不再像之前一样不知所措、不明所以。

有关Kafka参数的更多相关文章

  1. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  2. ruby - RSpec - 使用测试替身作为 block 参数 - 2

    我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere

  3. ruby - 如何在 Ruby 中拆分参数字符串 Bash 样式? - 2

    我正在为一个项目制作一个简单的shell,我希望像在Bash中一样解析参数字符串。foobar"helloworld"fooz应该变成:["foo","bar","helloworld","fooz"]等等。到目前为止,我一直在使用CSV::parse_line,将列分隔符设置为""和.compact输出。问题是我现在必须选择是要支持单引号还是双引号。CSV不支持超过一个分隔符。Python有一个名为shlex的模块:>>>shlex.split("Test'helloworld'foo")['Test','helloworld','foo']>>>shlex.split('Test"

  4. ruby - 检查方法参数的类型 - 2

    我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)

  5. ruby-on-rails - 在默认方法参数中使用 .reverse_merge 或 .merge - 2

    两者都可以defsetup(options={})options.reverse_merge:size=>25,:velocity=>10end和defsetup(options={}){:size=>25,:velocity=>10}.merge(options)end在方法的参数中分配默认值。问题是:哪个更好?您更愿意使用哪一个?在性能、代码可读性或其他方面有什么不同吗?编辑:我无意中添加了bang(!)...并不是要询问nobang方法与bang方法之间的区别 最佳答案 我倾向于使用reverse_merge方法:option

  6. ruby - 定义方法参数的条件 - 2

    我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano

  7. ruby - rails 3 redirect_to 将参数传递给命名路由 - 2

    我没有找到太多关于如何执行此操作的信息,尽管有很多关于如何使用像这样的redirect_to将参数传递给重定向的建议:action=>'something',:controller=>'something'在我的应用程序中,我在路由文件中有以下内容match'profile'=>'User#show'我的表演Action是这样的defshow@user=User.find(params[:user])@title=@user.first_nameend重定向发生在同一个用户Controller中,就像这样defregister@title="Registration"@user=Use

  8. ruby - 字符串文字中的转义状态作为 `String#tr` 的参数 - 2

    对于作为String#tr参数的单引号字符串文字中反斜杠的转义状态,我觉得有些神秘。你能解释一下下面三个例子之间的对比吗?我特别不明白第二个。为了避免复杂化,我在这里使用了'd',在双引号中转义时不会改变含义("\d"="d")。'\\'.tr('\\','x')#=>"x"'\\'.tr('\\d','x')#=>"\\"'\\'.tr('\\\d','x')#=>"x" 最佳答案 在tr中转义tr的第一个参数非常类似于正则表达式中的括号字符分组。您可以在表达式的开头使用^来否定匹配(替换任何不匹配的内容)并使用例如a-f来匹配一

  9. ruby-on-rails - 如何生成传递一些自定义参数的 `link_to` URL? - 2

    我正在使用RubyonRails3.0.9,我想生成一个传递一些自定义参数的link_toURL。也就是说,有一个articles_path(www.my_web_site_name.com/articles)我想生成如下内容:link_to'Samplelinktitle',...#HereIshouldimplementthecode#=>'http://www.my_web_site_name.com/articles?param1=value1¶m2=value2&...我如何编写link_to语句“alàRubyonRailsWay”以实现该目的?如果我想通过传递一些

  10. ruby-on-rails - 在 Controller 中干净地处理多个过滤器(参数) - 2

    我有一个名为Post的类,我需要能够适应以下场景:如果用户选择了一个类别,则只显示该类别的帖子如果用户选择了一种类型,则只显示该类型的帖子如果用户选择了一个类别和类型,则只显示该类别中该类型的帖子如果用户没有选择任何内容,则显示所有帖子我想知道我的Controller是否不可避免地会因大量条件语句而显得粗糙...这是我解决此问题的错误方法-有谁知道我如何才能做到这一点?classPostsController 最佳答案 您最好遵循“胖模型,瘦Controller”的惯例,这意味着您应该将这种逻辑放在模型本身中。Post类应该能够报告

随机推荐