草庐IT

云原生中间件RocketMQ-消费者核心参数、消费模式之集群模式

共饮一杯无 2023-07-20 原文

文章目录

PushConsumer核心参数详解

  1. consumeFromWhere:消费者从那个位置开始消费
  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费
  1. allocateMessageQueueStrategy:消息分配策略(集群模式),指定如何将消息队列分配给每个客户端。包括一致性hash、平均分配、平均轮询分配、自定义消费队列、按机房hash算法实现。默认平均轮询AllocateMessageQueueAveragely,Rebalance(轮询)算法实现策略。

平均分配的实现算法

  • 如果消费者的个数可以除尽队列的个数,那么就完全平均分。
  • 如果不能除尽。那么靠前的消费者多消费一个队列,靠后的消费平均数个队列。
  • 如果消费者的个数大于队列的个数,那么靠前的消费者消费一个队列,后面的不消费。
  1. subscription:订阅关系。

  1. offsetStore: 消息进度存储。
  • 集群消费:从远程Broker获取。
  • 广播消费:从本地文件获取。
  1. consumeThreadMin/consumeThreadMax:最小消费线程数,最大消费线程数,默认都是20。

  2. consumeTimeout:消息阻塞使用线程的最长时间(以分钟为单位),默认15。

  3. pullThresholdSizeForQueue:在队列级别限制缓存的消息大小,默认情况下每个消息队列最多缓存100MiB消息。

  4. consumeConcurrentlyMaxSpan / pullThresholdForQueue: 单队列并行消费允许的最大跨度,默认值2000, 队列级别的流量控制阈值,拉消息本地队列缓存消息最大数,默认1000。

  5. pullInterval: 消息拉取时间间隔,由于是长轮询,所以为0,但是如果应用为了流控,也可以设置大于0的值,单位毫秒,默认0。

  6. pullBatchSize: 批量拉消息,一次最多拉多少条,默认32。

  7. consumeMessageBatchMaxSize: 批量消费,一次消费多少条消息,默认1。

  8. maxReconsumeTimes:最大重试次数,并行模式下默认-1也就是16次,顺序模式下默认-1表示Integer.MAX_VALUE。

  9. awaitTerminationMillisWhenShutdown:当关闭消费者时等待消息消耗的最大时间,0表示没有等待。默认0。

  10. messageModel:消息模型定义了将消息传递到每个客户端的方式,默认集群模式。

PushConsumer消费模式-集群模式

GroupName用于把多个Consumer组织到一起,相同GroupName的Consumer只消费所订阅消息的一部分。
目的:达到天然的负载均衡机制。发消息队列数要和consumer数量为倍数,才能平均负载均衡。
消费者采用负载均衡(集群模式)方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。
集群模式: 使用相同 Group ID 的订阅者属于同一个集群。 同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用) , 这些订阅者在逻辑上可以认为是一个消费节点。

消费端通过如下代码指定消费模式:

// 消费模式 默认是集群模式(负载均衡模式),还有是广播模式
consumer.setMessageModel(MessageModel.CLUSTERING);

集群模式下,每个消费者消费的肯定不是同一个消息。

集群模式下每一个queue都只能被一个消费者消费,但是每一个消费者都可以消费多个queue。

集群模式适用场景&注意事项:

  • 消费端集群化部署, 每条消息只需要被处理一次;
  • 由于消费进度在服务端维护, 可靠性更高。
  • Topic + Tag下的消息可以保证肯定会被整个集群至少消费一次 ;
  • 不保证每一次失败重投的消息路由到同一台机器上, 因此处理消息时不应该做任何确定性假设。
  • 集群中的每个消费者消费的消息肯定不会是同一条消息,因为实际上在集群模式下
    • 每一个queue都只能被一个消费者消费
    • 但是每一个消费者都可以消费多个queue

本文内容到此结束了,
如有收获欢迎点赞👍收藏💖关注✔️,您的鼓励是我最大的动力。
如有错误❌疑问💬欢迎各位大佬指出。
主页共饮一杯无的博客汇总👨‍💻

保持热爱,奔赴下一场山海。🏃🏃🏃

有关云原生中间件RocketMQ-消费者核心参数、消费模式之集群模式的更多相关文章

  1. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  4. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  5. ruby - RSpec - 使用测试替身作为 block 参数 - 2

    我有一些Ruby代码,如下所示:Something.createdo|x|x.foo=barend我想编写一个测试,它使用double代替block参数x,这样我就可以调用:x_double.should_receive(:foo).with("whatever").这可能吗? 最佳答案 specify'something'dox=doublex.should_receive(:foo=).with("whatever")Something.should_receive(:create).and_yield(x)#callthere

  6. ruby - 如何在 Ruby 中拆分参数字符串 Bash 样式? - 2

    我正在为一个项目制作一个简单的shell,我希望像在Bash中一样解析参数字符串。foobar"helloworld"fooz应该变成:["foo","bar","helloworld","fooz"]等等。到目前为止,我一直在使用CSV::parse_line,将列分隔符设置为""和.compact输出。问题是我现在必须选择是要支持单引号还是双引号。CSV不支持超过一个分隔符。Python有一个名为shlex的模块:>>>shlex.split("Test'helloworld'foo")['Test','helloworld','foo']>>>shlex.split('Test"

  7. ruby - 检查方法参数的类型 - 2

    我不确定传递给方法的对象的类型是否正确。我可能会将一个字符串传递给一个只能处理整数的函数。某种运行时保证怎么样?我看不到比以下更好的选择:defsomeFixNumMangler(input)raise"wrongtype:integerrequired"unlessinput.class==FixNumother_stuffend有更好的选择吗? 最佳答案 使用Kernel#Integer在使用之前转换输入的方法。当无法以任何合理的方式将输入转换为整数时,它将引发ArgumentError。defmy_method(number)

  8. ruby-on-rails - 在默认方法参数中使用 .reverse_merge 或 .merge - 2

    两者都可以defsetup(options={})options.reverse_merge:size=>25,:velocity=>10end和defsetup(options={}){:size=>25,:velocity=>10}.merge(options)end在方法的参数中分配默认值。问题是:哪个更好?您更愿意使用哪一个?在性能、代码可读性或其他方面有什么不同吗?编辑:我无意中添加了bang(!)...并不是要询问nobang方法与bang方法之间的区别 最佳答案 我倾向于使用reverse_merge方法:option

  9. ruby - 定义方法参数的条件 - 2

    我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano

  10. ruby - rails 3 redirect_to 将参数传递给命名路由 - 2

    我没有找到太多关于如何执行此操作的信息,尽管有很多关于如何使用像这样的redirect_to将参数传递给重定向的建议:action=>'something',:controller=>'something'在我的应用程序中,我在路由文件中有以下内容match'profile'=>'User#show'我的表演Action是这样的defshow@user=User.find(params[:user])@title=@user.first_nameend重定向发生在同一个用户Controller中,就像这样defregister@title="Registration"@user=Use

随机推荐