草庐IT

kafka实战-消费者offset重置问题

弋在西元前 2023-07-17 原文

kafka实战-消费者offset重置问题

背景

背景:当app启动时,会调用 “启动上报接口” 上报启动数据,该数据包含且不限于手机型号、应用版本、app类型、启动时间等,一站式接入平台系统会记录该数据。

生产者:“启动上报接口”会根据启动数据发送一条kafka消息,topic“xxx”

消费者:“启动处理模块”会监控topic “xxx”,当发现消息时进行消费,将启动数据存放至相应的数据库中。

问题现象

当生产者和消费者的项目都启动后,我们发现,生产者在不断的生产消息,消费者在不断的进行消费,查询数据库中的启动数据也确实增加了,但是当前 consumer 的 offset 却一直是0。

监控了十分钟,发现如下现象:数据库数据确实增加,但是相同的数据却重复出现了N次,N随时间增加。

查看日志发现如下现象:

2023-02-01 23:13:55,323 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[273] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Adding newly assigned partitions: CONSUMER-7
2023-02-01 23:13:55,434 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[1306] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Found no committed offset for partition CONSUMER-7
2023-02-01 23:13:55,716 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState[397] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Resetting offset for partition CONSUMER-7 to offset 0.
2023-02-01 23:13:55,717 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions assigned: [CONSUMER-7]
...						 
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:业务日志
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...listener.ApiKafkaListener[58] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- tag:UPMAPI_APP_START_REPORT, message:业务日志
2023-02-01 23:19:23,425 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StatisticsServiceImpl[142] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- 业务日志
2023-02-01 23:19:23,811 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:业务日志
2023-02-01 23:19:23,811 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...listener.ApiKafkaListener[58] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- tag:UPMAPI_APP_START_REPORT, message:业务日志
2023-02-01 23:19:23,812 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StatisticsServiceImpl[142] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- 业务日志
2023-02-01 23:19:24,108 [...KafkaListenerEndpointContainer#5-7-C-1] INFO com...service.impl.StartServiceImpl[200] - [TID:4cf33af14b3a4b8cb76f4179594f38f2_201_16752639927830001] --- enterStartupLog success:业务日志
2023-02-01 23:19:24,108 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org...apache.kafka.clients.consumer.internals.ConsumerCoordinator[1107] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Failing OffsetCommit request since the consumer is not part of an active group
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator[1013] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] LeaveGroup request failed with error: The coordinator is not aware of this member.
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer[149] - Consumer exception java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1361) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke$original$cUFzUMfk(KafkaMessageListenerContainer.java:1097) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke$original$cUFzUMfk$accessor$KCXsM7NI(KafkaMessageListenerContainer.java) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$auxiliary$YPllDASy.call(Unknown Source) at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031) ... 3 common frames omitted
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[676] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator[311] - [Consumer clientId=consumer-app-loginfo-message-consumer-35, groupId=app-loginfo-message-consumer] Lost previously assigned partitions CONSUMER-7
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions lost: [CONSUMER-7]
2023-02-01 23:19:24,109 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer[292] - app-loginfo-message-consumer: partitions revoked: [CONSUMER-7]
2023-02-01 23:19:24,110 [...KafkaListenerEndpointContainer#5-7-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator[552] - [Consumer clientId=start-consumer-35, groupId=app-loginfo-message-consumer] (Re-)joining group

23:13:55发现消息并将offset置为0,然后开始poll,进行消费。

处理到23:19:24时全部数据处理完毕,开始提交offset。此时报错提交失败,消费者已离开消费组,当前消费者已分配的分区丢失。

然后当前消费者重新加入消费者,offset重置为0,从头开始消费。

分析原因

从以上日志可以看出,消费者正确消费,不存在报错的现象,kafka offset手动提交代码无误。因此可以断定报错原因不是业务异常,而是由于kafka自身机制导致的提交失败。

查看发现当前消费者制配置如下:

allow.auto.create.topics = true
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
bootstrap.servers = [ip0:9092, ip1:9092, ip2:9092]
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = app-loginfo-message-consumer
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 30000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

通过查看配置发现,由于启动数据量大,消费者每次拉取的消息数量满载,达到max.poll.records = 500,即每次拉取500条消息进行消费,全部消费完毕后进行offset提交。

然而通过上述日志可以发现,一条消息消费的时间接近一秒,500条数据消费的总时间≈500*1s=500s,该时间远大于max.poll.interval.ms=300000设置的最大拉取时间(300s)。

因此,当达到最大时间300s时,kafka client发现当前消费者仍未进行过拉取,因此认为当前消费者处于非健康状态,将其从消费者组中剔除。

所以,当消费者消费500条消息完毕后进行提交时,发现自己不在消费者组中,故而提交offset失败。
提交失败后,当前消费者重新加入消费者组,offset又从起点开始拉取。

上述过程反复进行,导致offset不断被重置。

问题解决

通过问题分析我们得到,当前现象产生的关键点是:“指定时间不足以消费指定数量消息” 的问题。所以可得两种解决方式进行解决:一种是减小max.poll.records,使得指定时间内能够完成全部消费;另一种是增大max.poll.interval.ms,使得当前数量的消息全部消费完毕后,还未达到时间阈值。

由于以下两点原因,我们决定通过第一种方式,即减小max.poll.records的方式来解决该问题:

1.由于在生产环境,我们的消费者服务是docker部署,支持动态扩容的,且配置了concurrency=#{partitionCount}进行多线程消费,所以处理速度完全满足业务需求,不会造成消息堆积。

2.max.poll.interval.ms相当于消费者的探活时间,默认=300000即3分钟,如果设置过大则失去原有意义,所以一般情况下,在业务能力可以得到满足时,不建议修改该配置。

结果:设置max.poll.records=100,问题解决。

附-常见的消费者配置描述和调优方案

1. max.poll.records

作用:意味消费者一次poll()操作,能够获取的最大数据量。

注意:调整这个值能提升吞吐量,于此同时也需要同步提升max.poll.interval.ms的参数大小。设置之前,应该考虑并计算所有数据处理完毕的时间,务必要小于max.poll.interval.ms的参数大小。

2. fetch.max.bytes

作用:意味server端可返回给consumer的最大数据大小

注意:增加可以提升吞吐量,但是在客户端和服务端网络延迟比较大的环境下,建议可以减小该值,防止业务处理数据超时。

3. heartbeat.interval.ms

作用:消费超时时间,consumer与kafka之间的超时时间

注意:该参数不能超过session.timeout.ms,通常设置为session.timeout.ms的三分之一,默认值:3000。

4. max.partition.fetch.bytes

作用:限制每个consumer发起fetch请求时候,读到数据(record)的限制

注意:设置过大,consumer本地缓存的数据就会越多,可能影响内存的使用,默认值:1048576。

5. fetch.max.bytes

作用:server端可返回给consumer的最大数据大小

注意:数值可大于max.partition.fetch.bytes,一般设置为默认值即可,默认值:52428800

6. session.timeout.ms

作用:使用consumer组管理offset时,consumer与broker之间的心跳超时时间

注意:如果consumer消费数据的频率非常低,建议增大这个参数值,默认值:10000。

7. auto.offset.reset

作用:消费过程中无法找到数据消费到的offset位置,所选择的消费策略

注意:earliest:从头开始消费,可能会消费到重复数据,latest:从数据末尾开始消费,可能会丢失数据。默认值:earlist。

8. max.poll.interval.ms

作用:消费者在每一轮poll() (拉取数据之间的最大时间延迟),默认值:300s

注意:如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将触发rebalance,以便将分区重新分配给别的成员

如果,再两次poll之间需要添加过多复杂的,耗时的逻辑,需要延长这个时间;当然,如果当前消费的消息消费结果可以忽略,即失败也无所谓,比如某些统计次数的场景等,数量有少数差异不影响业务使用,那么可以设置为异步处理,此时就不会消耗大量时间。

9. max.poll.records

作用:消费者一次poll()操作,能够获取的最大数据量

注意:增加这个参数值,会增加一次性拉取数据的数据量,确保拉取数据的时间,至少在max.poll.interval.ms规定的范围之内,默认值:500

更多优化内容可查看 5种kafka消费端性能优化方法

有关kafka实战-消费者offset重置问题的更多相关文章

  1. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  2. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  3. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  4. ruby - Fast-stemmer 安装问题 - 2

    由于fast-stemmer的问题,我很难安装我想要的任何ruby​​gem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=

  5. ruby - 安装 Ruby 时遇到问题(无法下载资源 "readline--patch") - 2

    当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  8. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

  9. SPI接收数据异常问题总结 - 2

    SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手

  10. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

随机推荐