草庐IT

java - 卡夫卡再平衡。重复处理问题

coder 2024-03-08 原文

我有一个消费者 worker 应用程序,它在内部启动 X 个线程,每个线程都生成它的 KafkaCosnumer。 Cosnumers 具有相同的 groupId 并且订阅了相同的主题。因此,每个消费者都能公平地分配分区。

处理的本质是我不能丢失消息,也不能允许重复。我运行的kafka版本是0.10.2.1。

这是我面临的问题:消费者线程 1 开始消费消息,并且在 poll() 上获取了一批消息。我还实现了 ConsumerRebalanceListener,这样每次成功处理消息时,它都会被添加到 offsets 映射中。 (见下面的代码。)因此,一旦重新平衡发生,我可以在我的分区重新分配给其他消费者之前提交我的偏移量。 有时,为了处理该批处理,它需要比 max.poll.interval.ms 更长的时间,这是重新平衡发生的地方,分区从消费者 1 中拉出并分配给消费者 2。消费者 1 没有不知道分区已被撤销并继续处理消息,与此同时,消费者 2 从最后一个偏移量(由 RebalanceListener 提交)获取并处理相同的消息。

有没有办法通知消费者他已撤销分区,以便他可以停止处理循环中的消息,这些消息已经分配给其他消费者?

public class RebalanceListener<K, V> implements ConsumerRebalanceListener {

    private final KafkaConsumer<K, V> consumer;

    private static final ConcurrentMap<TopicPartition, OffsetAndMetadata> CURRENT_OFFSETS =
            Maps.newConcurrentMap();

    private static final Logger LOGGER = LoggerFactory.getLogger(RebalanceListener.class);

    public RebalanceListener(KafkaConsumer<K, V> consumer) {
        this.consumer = consumer;
    }

    public void addOffset(String topic, int partition, long offset) {
        LOGGER.debug("message=Adding offset to offsets map, topic={}, partition={}, offset={}",
                topic, partition, offset);
        CURRENT_OFFSETS.put(new TopicPartition(topic, partition),
                new OffsetAndMetadata(offset, "commit"));
    }

    public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() {
        return CURRENT_OFFSETS;
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been revoked from consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        LOGGER.debug("message=Comitting offsets for partititions [{}]",
                CURRENT_OFFSETS.keySet().stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
        consumer.commitSync(CURRENT_OFFSETS);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        LOGGER.debug("message=following partitions have been assigned to consumer: [{}]",
                partitions.stream().map(
                        topicPartition -> topicPartition.topic() + ":" + topicPartition.partition())
                        .collect(joining(",")));
    }

}

我想我可以在 RebalanceListener 内部创建 consumerId -- TopicPartition 的并发映射,然后在处理每条消息之前检查当前消费者是否仍然关联与记录(每个 ConsumerRecord 都有 topicpartition 字段)。 如果没有 - 打破循环并进行下一个 poll()

如果我的工作应用程序将在一个实例中运行,这将是一个可行的解决方案,即使有多个 KafkaConsumer 线程在旋转。但是一旦我按比例放大,我将无法在静态映射中存储偏移量和消费者主题分区映射。这必须是某种集中式存储、数据库,或者比方说 Redis。

但是,在每次我处理一个项目之前,我都必须询问我的记录是否可以被当前的消费者线程合法地处理。在扩展的工作应用程序的情况下,它将是对外部存储的网络调用,这将破坏使用 kafka 的目的,因为它会减慢处理速度。我可能只是选择在处理单个项目后执行偏移量提交。

最佳答案

你需要实现 onPartitionsRevoked()

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html#onPartitionsRevoked(java.util.Collection)

It is guaranteed that all consumer processes will invoke onPartitionsRevoked prior to any process invoking onPartitionsAssigned. So if offsets or other state is saved in the onPartitionsRevoked call it is guaranteed to be saved by the time the process taking over that partition has their onPartitionsAssigned callback called to load the state.

关于java - 卡夫卡再平衡。重复处理问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47103436/

有关java - 卡夫卡再平衡。重复处理问题的更多相关文章

  1. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  2. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  3. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  4. ruby - 匹配未转义的平衡定界符对 - 2

    如何匹配未被反斜杠转义的平衡定界符对(其本身未被反斜杠转义)(无需考虑嵌套)?例如对于反引号,我试过了,但是转义的反引号没有像转义那样工作。regex=/(?!$1:"how\\"#expected"how\\`are"上面的正则表达式不考虑由反斜杠转义并位于反引号前面的反斜杠,但我愿意考虑。StackOverflow如何做到这一点?这样做的目的并不复杂。我有文档文本,其中包括内联代码的反引号,就像StackOverflow一样,我想在HTML文件中显示它,内联代码用一些spanMaterial装饰。不会有嵌套,但转义反引号或转义反斜杠可能出现在任何地方。

  5. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  6. ruby - Fast-stemmer 安装问题 - 2

    由于fast-stemmer的问题,我很难安装我想要的任何ruby​​gem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=

  7. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  8. ruby - 安装 Ruby 时遇到问题(无法下载资源 "readline--patch") - 2

    当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub

  9. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  10. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

随机推荐