草庐IT

kafka-offset手动提交和自动提交

Fairy要carry 2023-04-21 原文

目录

首先回顾之前的知识点

 自动提交offset

 手动提交

消费者poll消息的细节

 完整代码:

 按照新方法进行消费消息

1.指定时间进行消息的消费

 2.指定分区开始从头消费+指定分区的偏移量开始消费

新消费组的消费offset规则


 

首先回顾之前的知识点

消费者消费消息,每消费offset+1,然后提交offset给到我们kafka中topic中的cousumer_offsets,该消费者宕机后,另外的消费者就会读取consumer_offsets读取我们的offset消费后面的消息

我们kafka消费者是自动拉取消息的,mq是队列push给消费者

自动提交:消息poll下来后(还没有消费)直接提交offset,速度很快,可能出现消费失败

手动提交:在消息消费时/消费后再提交offset

 自动提交offset

缺点:可能会丢消息,比如消费者poll了topic中partition的消息后,然后提交offset,可能消费者没有消费成功

提交的内容offset——>消费组+topic+offset

自动提交的配置

   /**
         * 1.1设置是否自动提交offset并设置offset的间隔时间
         */
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        

 

一poll就提交offset了 

 手动提交

分为手动同步提交+手动异步提交

手动同步提交:消息消费完后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示成功

  consumer.commitAsync();

手动异步提交:不需要等集群返回ack,直接执行后序的逻辑即可,我们可以设置一个回调方法

消费者poll消息的细节

定义:消费者会根据设置的消费时间来决定消费多少消息

properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,500)//拉取0.5s消息

 默认消费者一次性poll500条信息(长轮询时间为1s),如果时间内poll了500条就结束for循环

 //长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

 完整代码:

 while(true){
            //长轮询拉取时间,1s:消费者拉取1s时间不管拉了多少条消息(除非时间内拉取完了zk维护的topic分区中所有消息)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("收到的消息:partition= %d,offset= %d,key= %s,value=%s %n",record.partition(),
                        record.offset(),record.key(),record.value());
            }

            /**
             * 4.1手动提交:所有消息消费完再提交offset给broker中_consumer_offsets
             */
            if(records.count()>0){
                //同步:阻塞,提交成功,等待broker的返回ack
               consumer.commitAsync();

                //异步:提交完后不需要等待broker返回ack,直接往下走

            }
        }

 如果两次poll的间隔>30s,集群会认为该消费者消费能力弱将其踢出,触发rebalance机制,消息交给消费组中的其他消费者

  properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);

 按照新方法进行消费消息

1.指定时间进行消息的消费

1.根据时间将topic中partition分区信息全部放入map中——>2.然后指定时间,封装topic和分区与时间到map中——>3.最后再将map添加到更高级的map,key为分区,如果有两个分区就是2个map——>4.最后遍历,然后提取出value并得到offset打印

 2.指定分区开始从头消费+指定分区的偏移量开始消费

        //指定分区消费
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));

        /**
         * 4.回溯消费消息(指定某分区从头开始消费)
         */
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
        consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));

        /**
         * 4.1指定offset开始消费
         */
        consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME,0)));
        consumer.seek(new TopicPartition(TOPIC_NAME,0),10);

新消费组的消费offset规则

新消费组在启动后,默认是从当前分区最后一条消息的offset+1开始消费,可以通过配置进行重新消费

  /**
         * 2.13设置下次换了消费组还是按照offset记录继续消费
         */
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"earliest");

有关kafka-offset手动提交和自动提交的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  3. ruby-on-rails - 从应用程序中自定义文件夹内的命名空间自动加载 - 2

    我们目前正在为ROR3.2开发自定义cms引擎。在这个过程中,我们希望成为我们的rails应用程序中的一等公民的几个类类型起源,这意味着它们应该驻留在应用程序的app文件夹下,它是插件。目前我们有以下类型:数据源数据类型查看我在app文件夹下创建了多个目录来保存这些:应用/数据源应用/数据类型应用/View更多类型将随之而来,我有点担心应用程序文件夹被这么多目录污染。因此,我想将它们移动到一个子目录/模块中,该子目录/模块包含cms定义的所有类型。所有类都应位于MyCms命名空间内,目录布局应如下所示:应用程序/my_cms/data_source应用程序/my_cms/data_ty

  4. git使用常见问题(提交代码,合并冲突) - 2

    文章目录git常用命令(简介,详细参数往下看)Git提交代码步骤gitpullgitstatusgitaddgitcommitgitpushgit代码冲突合并问题方法一:放弃本地代码方法二:合并代码常用命令以及详细参数gitadd将文件添加到仓库:gitdiff比较文件异同gitlog查看历史记录gitreset代码回滚版本库相关操作远程仓库相关操作分支相关操作创建分支查看分支:gitbranch合并分支:gitmerge删除分支:gitbranch-ddev查看分支合并图:gitlog–graph–pretty=oneline–abbrev-commit撤消某次提交git用户名密码相关配置g

  5. ruby-on-rails - 有没有一种工具可以在编码时自动保存对文件的增量更改? - 2

    我最喜欢的Google文档功能之一是它会在我工作时不断自动保存我的文档版本。这意味着即使我在进行关键更改之前忘记在某个点进行保存,也很有可能会自动创建一个保存点。至少,我可以将文档恢复到错误更改之前的状态,并从该点继续工作。对于在MacOS(或UNIX)上运行的Ruby编码器,是否有具有等效功能的工具?例如,一个工具会每隔几分钟自动将Gitcheckin我的本地存储库以获取我正在处理的文件。也许我有点偏执,但这点小保险可以让我在日常工作中安心。 最佳答案 虚拟机有些人可能讨厌我对此的回应,但我在编码时经常使用VIM,它具有自动保存功

  6. ruby - 在 ruby​​ 中使用自动创建插入数组 - 2

    我想知道是否可以通过自动创建数组来插入数组,如果数组不存在的话,就像在PHP中一样:$toto[]='titi';如果尚未定义$toto,它将创建数组并将“titi”压入。如果已经存在,它只会推送。在Ruby中我必须这样做:toto||=[]toto.push('titi')可以一行完成吗?因为如果我有一个循环,它会测试“||=”,除了第一次:Person.all.eachdo|person|toto||=[]#with1billionofperson,thislineisuseless999999999times...toto.push(person.name)你有更好的解决方案吗?

  7. ruby-on-rails - 如何在 Rails 中添加禁用的提交按钮 - 2

    我在ruby​​表单中有一个提交按钮f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id"我想在不使用任何javascript的情况下通过ruby​​禁用此按钮 最佳答案 添加disabled:true选项。f.submitbtn_text,class:"btnbtn-onemgt12mgb12",id:"btn_id",disabled:true 关于ruby-on-rails-如何在Rails中添加禁用的提交按钮,我们在St

  8. ruby-on-rails - 在 rails 中提交后回滚事务 - 2

    保存成功后可以回滚吗?让我有一个带有属性名称、电子邮件等的用户模型。例如u=User.newu.name="test_name"u.email="test@email.com"u.save现在记录将成功保存在数据库中,之后我想回滚我的事务(不是销毁或删除)。有什么想法吗? 最佳答案 您可以通过交易来做到这一点,请参阅http://markdaggett.com/blog/2011/12/01/transactions-in-rails/例子:User.transactiondoUser.create(:username=>'Nemu

  9. 【自动驾驶环境感知项目】——基于Paddle3D的点云障碍物检测 - 2

    文章目录1.自动驾驶实战:基于Paddle3D的点云障碍物检测1.1环境信息1.2准备点云数据1.3安装Paddle3D1.4模型训练1.5模型评估1.6模型导出1.7模型部署效果附录show_lidar_pred_on_image.py1.自动驾驶实战:基于Paddle3D的点云障碍物检测项目地址——自动驾驶实战:基于Paddle3D的点云障碍物检测课程地址——自动驾驶感知系统揭秘1.1环境信息硬件信息CPU:2核AI加速卡:v100总显存:16GB总内存:16GB总硬盘:100GB环境配置Python:3.7.4框架信息框架版本:PaddlePaddle2.4.0(项目默认框架版本为2.3

  10. ruby-on-rails - 自动完成搜索的 Rails 实现 - 2

    我不确定如何为我的搜索功能添加自动完成表单。"get"do%>nil%>我有一个具有自定义操作的Controllerdefquery@users=Search.user(params[:query])@article=Search.article(params[:query])end模型如下:defself.user(search)ifsearchUser.find(:all,:conditions=>['first_nameLIKE?',"%#{search}%"])elseUser.find(:all)endenddefself.article(search)ifsearchArt

随机推荐