我们在分析生产者的时候有专门写过文章分析生产者的分区分配策略
Kafka生产者的3种分区策略
生成者的分配策略是把我们产生的消息选择一个合适的分区去发送,
那么今天我们要讲解一下 消费者的分区分配策略 他要做的事情是
同一个消费组中 给不同消费者分配能够消费的分区数;
同一个消费组中,一个分区只会被一个消费者消费。
每个消费组客户端都可以配置一个partition.assignment.strategy属性 并且可以配置多个自己支持的分配策略,例如:
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor
默认策略是 org.apache.kafka.clients.consumer.RoundRobinAssignor
既然每个客户端成员都可以配置多个自己支持的分配策略, 那么GroupCoordinator(消费组协调器)使用哪个分配策略去分配这些资源呢?
肯定是需要消费组下面的所有成员都使用同一种分配策略来进行分配。所以GroupCoordinator就面临着选择哪个分配策略。
选择的逻辑如下
partition.assignment.strategy配置靠前的策略。请看下面的2个例子
| case | consumer-0 | consumer-1 | consumer-2 | 选中策略 |
|---|---|---|---|---|
| case-1 | roundrobin,rang | rang,roundrobin,strick | roundrobin,rang | roundrobin |
| case-2 | strick,roundrobin,rang | rang,roundrobin | strick ,rang | rang |
Case-1
Case-2
如果新Member加入Group的时候, 带上的分配策略跟现有Group中所有Member(Group有Member的情况下)都支持的协议都不交叉
那么就会抛出异常:INCONSISTENT_GROUP_PROTOCOL
[2022-09-08 14:34:12,508] INFO [Consumer clientId=client2, groupId=consumer0] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list.
[2022-09-08 14:34:12,511] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
这个协议的选择的代码逻辑在 GroupMetadata#selectProtocol
调用的时机是当前发起JoinGroup的Member都完成JoinGroup,并调用onCompleteJoin
具体详情可以看 : Kafka消费者JoinGroupRequest流程解析

既然我们已经知道了分区分配策略的选择, 那么什么时候会触发这个策略的逻辑计算呢?
如果你有看过之前的文章: Kafka消费者JoinGroupRequest流程解析 那么对此就肯定会有一定的了解
当所有的Member(成员)发起JoinGroup请求, 并且组协调器(GroupCoordinator)也都处理正常,就会回调当前发起JoinGroup请求的Member(成员)
其中有个最特别的就是, 组协调器(GroupCoordinator)会把所有的Member(成员)的元信息打包一并返回给那个Leader Member, 而Follow Member是不会返回的。
Leader Member 接受到回调并拿到这个元信息之后, 就开始去计算每个成员应该被分配到的分区。
代码定位
ConsumerCoordinator#performAssignment
@Override
protected Map<String, ByteBuffer> performAssignment(String leaderId,
String assignmentStrategy,
List<JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions) {
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
//省略部分代码...
// 更新一下所有订阅的Topic的元信息
// 如果有变更的元信息则更新一下
updateGroupSubscription(allSubscribedTopics);
//省略部分代码...
Map<String, Assignment> assignments = assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();
if (protocol == RebalanceProtocol.COOPERATIVE) {
validateCooperativeAssignment(ownedPartitions, assignments);
}
//省略部分代码...
}
上面的代码主要是 根据分配策略,获取分配策略实例, 然后调用 assign方法进行计算,得到分配方式。
但是最终调用的计算逻辑是每个AbstractPartitionAssignor实现类的assign方法。
并且也可以实现自定义的分配策略.只需要实现接口AbstractPartitionAssignor就行。
在 3.1 分配策略计算时机 中我们知道分配策略的计算时机, 那么计算好了之后如何告知其他的Member, 他们对应的分配状态呢?
当每个Member收到JoinGroup的回调之后, 他们会发起一个SyncGroupRequest, 其中Leader Member就会把刚刚计算好的分配策略, 一起当做入参发起请求。请看下图

上面发起的请求也只是告知了组协调器(GroupCoordinator)分配的情况, 最终还是需要组协调器(GroupCoordinator)来告知每个Member的。
那么这个告知的过程就是所有Member都同步完成后的回调 ;
具体请看:KafkaConsumer SyncGroupRequest详解
上面所有的铺垫都讲解清楚了,那么目前Kafka支持哪些分配策略呢?
我们来一一分析一下
partition.assignment.strategy=]org.apache.kafka.clients.consumer.RangeAssignor
这也是默认的分配策略
它是以单个Topic为一个维度来计算分配的, 他只负责将每一个Topic的分区尽可能均衡的分配给消费者
消费组里面所有消费者(Member)按照字母排序, 给Topic的分区按照分区号排序。
先计算每个分区最少平均分配多少个分区数, 然后余下的逐个分 举个例子:Topic为Topic1 有11个分区;有3个消费者订阅 那么平均每个 11/3=3余2, 那么前面两个可以分到4个分区,最后一个分到3个;[ 4, 4, 3 ]
他们最终分配方式如下
| 消费者 |
|
| --- | --- |
| Member:client1-ba0ebe99-cd09-42e9-87b9-11b6f828bfca | Topic1-0, Topic1-1, Topic1-2, Topic1-3 |
| Member:client2-cbfb4cf2-c878-41d2-852c-86d56dbb99c2 | Topic1-4, Topic1-5, Topic1-6, Topic1-7 |
| Member:client3-ad60e7a5-204f-4741-b66f-3da3acb0a2f9 | Topic1-8, Topic1-9, Topic1-10 |
分配是先分完一个消费者再分配下一个的,跟遍历是有区别。clientId-1 先分到 [ 0 , 1 , 2 , 3 ] 号分区, 后面的接着分。[图片上传失败...(image-83ca39-1663553888622)]
图里面的Member就是消费者, 对消费组来说他内部的对象是Member
Range弊端
Range针对单个Topic的情况下显得比较均衡, 但是假如Topic很多的话, Member排序靠前的可能会比Member排序靠后的负载多很多。

看,像这种情况, 3个Member都订阅了这4个Topic, 可是Member这么多分区愣是没有分配到1个
把所有Member排序, 所有TopicPartition排序。轮训遍历分配

Member-3下线

RoundRobin的一些弊端
如果成员订阅的Topic不尽相同的时候, 最终结果也不可能会完全均衡的。

如果图中的Memner-3比另外两个多订阅了Topic-4,那他总共就消费了6个分区了, 但是另外两个分别只消费了2个分区。
如果这里的Member-3把分区 Topic2-0、Topic3-1 分给另外两个那才是最均衡的情况。
那么有什么策略能解决这个问题吗?接下来我们另外一个分区策略 -- 粘性分区
上面介绍的两种分区分配方式,多多少少都会有一些分配上的偏差, 而且每次重新分配的时候都是把所有的都重新来计算并分配一遍, 那么每次分配的结果都会偏差很多, 如果我们在计算的时候能够考虑上一次的分配情况,来尽量的减少分配的变动,不失为一种优化方案。
我们之前在讲生产者的时候也讲过粘性分区:
Kafka生产者的3种分区策略
那么消费者的粘性分区策略是什么样子的呢?
目标:
分区的分配尽量的均衡
每一次重分配的结果尽量与上一次分配结果保持一致
当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。
首先, StickyAssignor粘性分区在进行分配的时候,是以RoundRobinAssignor的分配逻辑来计算的,但是它又弥补了RoundRobinAssignor的一些可能造成不均衡的弊端。
比如在讲RoundRobinAssignor弊端的那种case, 但是在StickyAssignor中就是下图的分配情况
把RoundRobinAssignor的弊端给优化了

体现粘性分区地方就在于重新分配的时候了, 还是上面的case(上图右边的StickAssignor), 假如 Member-2 离线了
粘性分区的计算方式把把离线的那个Member所属的分区分配给其他的Member, 在其他的Member已拥有的分区不变的前提下,尽量的均衡。
Member-2 有3个分区, 可以分两个分区给Member-1,分1个分区给Member-3 最终分配图如下:

上面分析的StickyAssignor粘性分区策略,主要作用是保证消费者客户端在重平衡之后能够维持原本的分配方案。
但是StickyAssignor还是属于 RebalanceProtocol.EAGER 协议, 重平衡的时候需要每个客户端都要先放弃当前持有的资源。
为了解决这个问题, 所以就有了 CooperativeStickyAssignor分配策略
你可以理解为 CooperativeStickyAssignor 的分配策略跟StickyAssignor的策略差不多。
但是它在此基础上是用的RebalanceProtocol.COOPERATIVE协议。渐进式的重平衡。
后续专门写一篇文章来讲解一下这一块内容,挖个坑0.0
我们先看一下分区策略的类图

我们想要自定义分配策略,只需要实现接口:
public interface ConsumerPartitionAssignor {
/**
* 返回序列化后的自定义数据
*/
default ByteBuffer subscriptionUserData(Set<String> topics) {
return null;
}
/**
* 分区分配的计算逻辑
*/
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
/**
* 当组成员从领导者那里收到其分配时调用的回调
*/
default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
}
/**
* 指明使用的再平衡协议
* 默认使用RebalanceProtocol.EAGER协议, 另外一个可选项为 RebalanceProtocol.COOPERATIVE
*/
default List<RebalanceProtocol> supportedProtocols() {
return Collections.singletonList(RebalanceProtocol.EAGER);
}
/**
* Return the version of the assignor which indicates how the user metadata encodings
* and the assignment algorithm gets evolved.
*/
default short version() {
return (short) 0;
}
/**
* 分配器的名字
* 例如 RangeAssignor、RoundRobinAssignor、StickyAssignor、CooperativeStickyAssignor
* 对应的名字为
* range、roundrobin、sticky、cooperative-sticky
*/
String name();
当然我们也可以根据自己的需求来实现其他的抽象类
比如:AbstractStickyAssignor抽象类就是专门给粘性分区使用的抽象类
上面我们讲的是分区策略, 但是分区策略本质上又分为两大类
这两个区别是
EAGER 重新平衡协议要求消费者在参与重新平衡事件之前始终撤销其拥有的所有分区。因此,它允许完全改组分配
COOPERATIVE协议允许消费者在参与再平衡事件之前保留其当前拥有的分区。分配者不应该立即重新分配任何拥有的分区,而是可以指示消费者需要撤销分区,以便可以在下一次重新平衡事件中将被撤销的分区重新分配给其他消费者
COOPERATIVE协议将一次全局重平衡,改成每次小规模重平衡,直至最终收敛平衡的过程。
COOPERATIVE有效的改进来在此之前EAGER协议重平衡而触发的stop-the-world(STW)
我们上面讲的分配策略3种策略都是 RebalanceProtocol.EAGER 协议
而CooperativeStickyAssignor分配策略是使用的 RebalanceProtocol.COOPERATIVE协议
关于更多的关于重平衡协议的讲解,请看: Kafka 重平衡的两种协议讲解
通过rubykoans.com,我在about_array_assignment.rb中遇到了这两段代码你怎么知道第一个是非并行赋值,第二个是一个变量的并行赋值?在我看来,除了命名差异之外,代码几乎完全相同。4deftest_non_parallel_assignment5names=["John","Smith"]6assert_equal["John","Smith"],names7end45deftest_parallel_assignment_with_one_variable46first_name,=["John","Smith"]47assert_equal'John
rpartition和partition有什么区别?我已经阅读了文档,但我认为它们是一样的。只是那些出现在后来的ruby版本中吗? 最佳答案 以下示例将有助于识别差异:"abccba".partition("b")#=>["a","b","ccba"]"abccba".rpartition("b")#=>["abcc","b","a"]所以区别在于rpartition搜索最右边的匹配项,而不是最左边的匹配项。 关于Rubyrpartition与分区?,我们在StackOverflow
我早就知道Ruby中的“常量”(即大写的变量名)不是真正常量。与其他编程语言一样,对对象的引用是唯一存储在变量/常量中的东西。(侧边栏:Ruby确实具有“卡住”引用对象不被修改的功能,据我所知,许多其他语言都没有提供这种功能。)所以这是我的问题:当您将一个值重新分配给常量时,您会收到如下警告:>>FOO='bar'=>"bar">>FOO='baz'(irb):2:warning:alreadyinitializedconstantFOO=>"baz"有没有办法强制Ruby抛出异常而不是打印警告?很难弄清楚为什么有时会发生重新分配。 最佳答案
假设您在Ruby中执行此操作:ar=[1,2]x,y=ar然后,x==1和y==2。是否有一种方法可以在我自己的类中定义,从而产生相同的效果?例如rb=AllYourCode.newx,y=rb到目前为止,对于这样的赋值,我所能做的就是使x==rb和y=nil。Python有这样一个特性:>>>classFoo:...def__iter__(self):...returniter([1,2])...>>>x,y=Foo()>>>x1>>>y2 最佳答案 是的。定义#to_ary。这将使您的对象被视为要分配的数组。irb>o=Obje
我正在使用Dragonfly在Rails3.1应用程序上处理图像。我正在努力通过url将图像分配给模型。我有一个很好的表格:{:multipart=>true}do|f|%>RemovePicture?Dragonfly的文档指出:Dragonfly提供了一个直接从url分配的访问器:@album.cover_image_url='http://some.url/file.jpg'但是当我在控制台中尝试时:=>#ruby-1.9.2-p290>picture.image_url="http://i.imgur.com/QQiMz.jpg"=>"http://i.imgur.com/QQ
使用Paperclip,我想从这样的URL抓取图像:require'open-uri'user.photo=open(url)问题是我最后得到一个像“open-uri20110915-4852-1o7k5uw”这样的文件名。有什么方法可以更改user.photo上的文件名?作为一个额外的变化,Paperclip将我的文件存储在S3上,所以如果我可以在初始分配中设置我想要的文件名就更好了,这样图像就会上传到正确的S3key。像这样:user.photo=open(url),:filename=>URI.parse(url).path 最佳答案
这是我理想中想要的。用户做:a="hello"输出为Youjustallocated"a"!=>"Hello"顺序无关紧要,只要我能实现该消息即可。 最佳答案 不,没有直接的方法可以做到这一点,因为在执行代码之前,Ruby字节码编译器会丢弃局部变量名。YARV(MRI1.9.2中使用的RubyVM)提供的关于局部变量的唯一指令是getlocal和setlocal,它们都对整数索引进行操作,而不是变量名。以下是1.9.2源代码中insns.def的摘录:/****************************************
我想使用两种不同的protect_from_forgery策略构建一个Rails应用程序:一种用于Web应用程序,一种用于API。在我的应用程序Controller中,我有这行代码:protect_from_forgerywith::exception为了防止CSRF攻击,它工作得很好。在我的API命名空间中,我创建了一个继承self的应用程序Controller的api_controller,它是API命名空间中所有其他Controller的父类,我将上面的代码更改为:protect_from_forgery:null_session.遗憾的是,我在尝试发出POST请求时遇到错误:“
文章目录一基础定义二创建逻辑卷2-1准备物理设备2-2创建物理卷2-3创建卷组2-4创建逻辑卷2-5创建文件系统并挂载文件三扩展卷组和缩减卷组3-1准备物理设备3-2创建物理卷3-3扩展卷组3-4查看卷组的详细信息以验证3-5缩减卷组四扩展逻辑卷4-1检查卷组是否有可用的空间4-2扩展逻辑卷4-3扩展文件系统五删除逻辑卷5-1备份数据5-2卸载文件系统5-3删除逻辑卷5-4删除卷组5-5删除物理卷六LVM逻辑卷缩容6-1缩容注意事项6-2标准缩容步骤一基础定义LVM,LogicalVolumeManger,逻辑卷管理,Linux磁盘分区管理的一种机制,建立在硬盘和分区上的一个逻辑层,提高磁盘分
假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来