草庐IT

java - 避免apache kafka消费者重复消息的有效策略

coder 2023-05-15 原文

我已经学习 apache kafka 一个月了。然而,我现在陷入了困境。我的用例是,我有两个或多个消费者进程在不同的机器上运行。我进行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息。然后在处理这些消息时,我杀死了一个消费者进程并重新启动它。消费者正在将处理后的消息写入文件。所以消费完成后,文件显示超过 10k 条消息。所以有些消息是重复的。

在消费者进程中,我禁用了自动提交。消费者手动批量提交偏移量。因此,例如,如果将 100 条消息写入文件,消费者会提交偏移量。当单个消费者进程正在运行并且它崩溃并恢复时,以这种方式避免了重复。但是当多个消费者在运行并且其中一个崩溃并恢复时,它会将重复的消息写入文件。

是否有任何有效的策略来避免这些重复消息?

最佳答案

简短的回答是,不。

您正在寻找的是一次性处理。虽然它通常看起来可行,但永远不应该依赖它,因为总是有警告。

即使为了防止重复,您也需要使用简单消费者。这种方法的工作原理是针对每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘。当消费者在失败后重启时,从磁盘读取每个分区的最后消费偏移量。

但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息。如果消费者消费了一条消息,然后在偏移量刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您在每条消息后向 ZooKeeper 提交偏移量,也会存在同样的问题。

但在某些情况下, 精确一次处理更容易实现,但仅适用于某些用例。这只需要将您的偏移量存储在与单元应用程序的输出相同的位置。例如,如果您编写一个对消息进行计数的消费者,通过将最后计数的偏移量与每个计数一起存储,您可以保证该偏移量与消费者的状态同时存储。当然,为了保证一次性处理,这将要求您只使用一条消息并为每条消息只更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的。出于性能原因,Kafka 本质上会批量消费消息。

如果您简单地将其设计为幂等性,通常您的时间会花得更多,您的应用程序也会更加可靠。

关于java - 避免apache kafka消费者重复消息的有效策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29647656/

有关java - 避免apache kafka消费者重复消息的有效策略的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby - 如何进行排列以有效地定制输出 - 2

    这是一道面试题,我没有答对,但还是很好奇怎么解。你有N个人的大家庭,分别是1,2,3,...,N岁。你想给你的大家庭拍张照片。所有的家庭成员都排成一排。“我是家里的friend,建议家庭成员安排如下:”1岁的家庭成员坐在这一排的最左边。每两个坐在一起的家庭成员的年龄相差不得超过2岁。输入:整数N,1≤N≤55。输出:摄影师可以拍摄的照片数量。示例->输入:4,输出:4符合条件的数组:[1,2,3,4][1,2,4,3][1,3,2,4][1,3,4,2]另一个例子:输入:5输出:6符合条件的数组:[1,2,3,4,5][1,2,3,5,4][1,2,4,3,5][1,2,4,5,3][

  3. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  4. ruby-on-rails - RSpec:避免使用允许接收的任何实例 - 2

    我正在处理旧代码的一部分。beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)endRubocop错误如下:Avoidstubbingusing'allow_any_instance_of'我读到了RuboCop::RSpec:AnyInstance我试着像下面那样改变它。由此beforedoallow_any_instance_of(SportRateManager).toreceive(:create).and_return(true)end对此:let(:sport_

  5. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  6. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  7. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  8. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  9. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  10. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

随机推荐