副本是分布式系统中对数据和服务提供的一种冗余方式。为了对外提供可用的服务,往往会对数据和服务进行副本处理。
Kafka 从 0.8 版本开始为分区引入了多副本机制,通过增加副本数量提升数据容灾能力。同时,Kafka通过多副本机制实现了故障自动转移,在 Kafka 集群中的某个节点失效的情况下仍然保证服务可用。从生产者发出的一条消息,首先会被写入分区的 Leader 副本,然后需要等待 ISR 集合中的所有 Follower 副本同步完成之后才能被认为已经提交,接着更新分区的 HW,进而消费者可以消费到这条消息。
副本:相对于分区而言的,即副本是特定分区的副本。一个分区包含一个或者多个副本,其中一个为 Leader 副本,其余为 Follower 副本,各个副本位于不同的 broker 节点上。只有 Leader 副本对外提供服务,Follower 副本只负责与 Leader 副本进行数据同步。
AR:分区中的所有副本的统称。
ISR:所有与 Leader 副本保持同步状态的副本集合(Leader 副本也是 ISR 集合的一员)。
LEO:标识每个分区的最后一条消息的下一个位置,分区的每个副本都有自己的 LEO。
HW:ISR 中最小的 LEO,俗称高水位,消费者只能拉取到 HW 之前的消息。
失效副本
处于失效状态(同步失效或者功能失效)的副本会被剥离出 ISR 集合,失效副本对应的分区称为失效分区,即 under-replicated 分区。
可以通过如下脚本命令查看失效分区:
bin/kafka-topics.sh --bootstrap-server 10.211.55.6:9092,10.211.55.7:9092,10.211.55.8:9092 --describe --topic thing1 --under-replicated-partitions
功能失效:比如副本下线。
同步失效:当 ISR 集合中的一个 Follower 副本滞后 Leader 副本的时间超过 broker 端参数 - replica.lag.time.max.ms(默认10000)就会判定为同步失效。 比如 Follower 副本的 I/O 开销过大导致 Follower 副本同步速度太慢,在一段时间内都无法追赶上 Leader 副本;频繁的 Full GC 导致进程卡住,在一段时间内没有向 Leader 副本发送同步请求。
ISR的伸缩与扩充
当检测到 ISR 集合中有失效副本时,就会收缩 ISR 集合。
ISR的收缩过程如下图:

1、Kafka启动的时候会开启两个与 ISR 相关的定时任务 - “isr-expiration”、“isr-change-propagation”。
2、“isr-expiration” 任务 每隔 replica.lag.time.max.ms / 2 (默认5000ms)检测每个分区是否需要缩减其 ISR 集合,当检测到 ISR 集合中有失效副本时,就会收缩 ISR 集合。如果某个分区的 ISR 集合发生变更,则将变更后的数据记录到 ZooKeeper 的 /brokers/topics/<topic>/partition/<partition>/state 节点中。此外,还会将变更后的记录缓存到 isrChangeSet 中。
3、“isr-change-propagation” 任务 每隔 2500ms 检查 isrChangeSet,如果发现 isrChangeSet 中有 ISR 集合的变更记录,则在 ZooKeeper 的 “/isr_change_notification” 路径下创建一个以 “isr_change” 开头的持久顺序节点,并将 isrChangeSet 中的信息保存到这个节点中。
4、Kafka的控制器为 “/isr_change_notification” 添加一个 Watcher,当这个节点有子节点发生变化时,会触发 Watcher 的处理,通知控制器更新元数据并向它管理的 Kafka Broker 节点发送更新元数据的请求,最后删除 /isr_change_notification 路径下已经处理过的子节点。
Note:为了避免频繁触发 Watcher 的处理影响 Kafka 控制器、其它broker节点、ZooKeeper 的性能,Kafka 添加了限定条件,当检测到分区的 ISR 集合发生变化时,还需要检查以下两个条件:
(1)上一次 ISR 集合发生变化距离现在已经超过5s。
(2)上一次写入 ZooKeeper 的时间距离现在已经超过60s。
满足以上两个条件之一才可以将 ISR 集合的变化写入 ZooKeeper 中。
当 Follower 副本不断与 Leader 副本进行数据同步,并最终追赶上 Leader 副本(当前 Follower 副本的 LEO 大于等于 Leader 副本的 HW)时,Follower 副本就有资格进入 ISR 集合。ISR 集合的扩充过程与收缩过程相似,这里不再展开分析。
副本更新LEO、HW的过程

生产者往 Leader 副本写入消息,消息被追加到 Leader 副本的本地日志,并且会更新 LEO。

之后 Follower 副本向 Leader 副本拉取消息,并且带有自身的 LEO 信息。

Leader 副本读取本地日志,返回给 Follower 副本对应的消息,并且带有自身的 HW 信息。
Follower 副本收到 Leader 副本返回的消息,会将消息追加到本地日志中,并且更新 LEO、HW。
Follower 副本更新 HW 的算法:比较当前 LEO 与 Leader 副本返回的 HW 的值,取较小值作为自己的 HW

Follower 副本再次请求拉取 Leader 副本中的消息,并且带有更新后的 HW 的值。
Leader 副本收到 Follower 的请求后,选择最小的 LEO 作为 HW。

Follower 副本收到 Leader 副本返回的消息,会接着将消息追加到本地日志中,并且更新 LEO、HW。
Leader Epoch 的引入
在 0.11.0.0 版本之前,Kafka 使用的是基于 HW 的同步方式。这种方式可能会出现 数据丢失 或者 Leader 副本和 Follower 副本数据不一致 的问题。

Follower 副本在更新 LEO 为 2 和 更新 HW 为 2 之间存在一轮的 FetchRequest/FetchResponse。如果在这个过程中,Follower 副本宕机了,那么重启后会根据之前记录的 HW(读取 replication-offset-checkpoint 文件)进行日志截断,那么会导致 b 这条消息被删除。然后 Follower 副本再向 Leader 副本拉取消息,如果此时 Leader 副本宕机并且 Follower 副本被选举为新的 Leader,接着原来的 Leader 副本恢复后就会成为 Follower 副本,由于 Follower 副本的 HW 不能比 Leader 副本的 HW 高,所以还会进行一次日志截断,由此 b 这条消息就丢失了。或者原来的 Leader 副本无法恢复,b 这条消息也是会丢失的。

如果 min.insync.replicas=1 的场景下,上述两个副本处于挂掉状态,Replica B 先恢复并成为 Leader 副本,接着写入消息 c 并更新 LEO、HW 为 2。此时 Replica A 也恢复过来了,成为 Follower 副本并且需要根据 HW 截断日志以及向 Leader 副本拉取数据,由于此时 Replica A 的 HW 也是 2,所以可以不做任何调整。如此一来,Replica A 和 Replica B 就会出现数据不一致的问题。
为了解决上述两个问题,Kafka 从0.11.0.0开始引入了 Leader Epoch 的概念,在需要截断数据的时候使用 Leader Epoch 作为参考依据。Leader Epoch 初始值为 0,代表 Leader 的纪元,每次 Leader 变更,该值都会加一。每个副本在本地的 leader-epoch-checkpoint 文件中记录 <LeaderEpoch => StartOffset> 信息。
StartOffset:当前 Leader Epoch 下的写入的第一条消息的偏移量

min.insync.replicas=1 的场景下,Replica A、Replica B 都处于挂掉的状态,Replica B 先恢复通过选举成为 Leader并更新 LE,然后写入 c 这条消息并更新 LEO、HW。这时 Replica A 恢复过来成为 Follower,向 Replica B 发送请求并携带自身的 LE。Replica B 接收到请求后,比较两者的 LE 发现不一致,然后返回 Replica B 当前 LE 对应的 StartOffset。Replica A 比较 Replica B 返回的 StartOffset 与自己的 LEO ,判断是否需要日志截断。


我是一名决定学习Ruby和RubyonRails的ASP.NETMVC开发人员。我已经有所了解并在RoR上创建了一个网站。在ASP.NETMVC上开发,我一直使用三层架构:数据层、业务层和UI(或表示)层。尝试在RubyonRails应用程序中使用这种方法,我发现没有关于它的信息(或者也许我只是找不到它?)。也许有人可以建议我如何在RubyonRails上创建或使用三层架构?附言我使用ruby1.9.3和RubyonRails3.2.3。 最佳答案 我建议在制作RoR应用程序时遵循RubyonRails(RoR)风格。Rails
我尝试用Ruby设计一个基于Web的应用程序。我开发了一个简单的核心应用程序,在没有框架和数据库的情况下在六边形架构中实现DCI范例。核心六边形中有小六边形和网络,数据库,日志等适配器。每个六边形都在没有数据库和框架的情况下自行运行。在这种方法中,我如何提供与数据库模型和实体类的关系作为独立于数据库的关系。我想在将来将框架从Rails更改为Sinatra或数据库。事实上,我如何在这个核心Hexagon中实现完全隔离的rails和mongodb的数据库适配器或框架适配器。有什么想法吗? 最佳答案 ROM呢?(Ruby对象映射器)。还有
这几天我一直在为这个问题苦苦挣扎。我有一个正在为其构建一些API的应用程序,并且上述错误总是在第一次运行时使我的应用程序崩溃。重新加载应用程序时错误消失,但仍然很烦人。以下是关于此错误的一些类似问题:AcopyofxxxhasbeenremovedfromthemoduletreebutisstillactiveArgumentError:AcopyofApplicationControllerhasbeenremovedfromthemoduletreebutisstillactive这两个链接都没有解决我面临的问题。这是完整的堆栈跟踪:ArgumentError(AcopyofAp
“架设一个亿级高并发系统,是多数程序员、架构师的工作目标。许多的技术从业人员甚至有时会降薪去寻找这样的机会。但并不是所有人都有机会主导,甚至参与这样一个系统。今天我们用12306火车票购票这样一个业务场景来做DDD领域建模。”开篇要实现软件设计、软件开发在一个统一的思想、统一的节奏下进行,就应该有一个轻量级的框架对开发过程与代码编写做一定的约束。虽然DDD是一个软件开发的方法,而不是具体的技术或框架,但拥有一个轻量级的框架仍然是必要的,为了开发一个支持DDD的框架,首先需要理解DDD的基本概念和核心的组件。一.什么是领域驱动设计(DDD)首先要知道DDD是一种开发理念,核心是维护一个反应领域概
一、解决痛点使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件二、组件能力1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix为auto,因此只有auto前缀的topic,才会被组件动态监听。)2、应用输出日志,监听到新增auto.topic1,并初始化客户端(主题刷新间隔为10s)3、发新的消
我正在使用Ruby-Tk为OSX开发一个桌面应用程序,我想为该应用程序提供一个AppleEvents接口(interface)。这意味着应用程序将定义它将响应的AppleScript命令的字典(对应于发送到应用程序的Apple事件),并且用户/其他应用程序可以使用AppleScript命令编写Ruby-Tk应用程序的脚本。其他脚本语言支持此类功能——Python通过位于http://appscript.svn.sourceforge.net/viewvc/appscript/py-aemreceive/的py-aemreceive库和Tcl通过位于http://tclae.source
Method#unbind返回对该方法的UnboundMethod引用,稍后可以使用UnboundMethod#bind将其绑定(bind)到另一个对象.classFooattr_reader:bazdefinitialize(baz)@baz=bazendendclassBardefinitialize(baz)@baz=bazendendf=Foo.new(:test1)g=Foo.new(:test2)h=Bar.new(:test3)f.method(:baz).unbind.bind(g).call#=>:test2f.method(:baz).unbind.bind(h).
我在当前项目中使用由Oracle数据库和memcached支持的RubyonRails。有一个非常常用的功能,它依赖于单个数据库View作为数据源,并且该数据源内部有其他数据库View和表。这是一个虚拟数据库View,能够从一个地方访问所有内容,而不是物化数据库View。大多数情况下,如果用户正在使用他们希望更新的功能,那么让数据保持最新很重要。从这个View获取数据时,我将安全表内部连接到View(安全表不是View本身的一部分),其中包含一些我们用来在更细粒度级别上控制数据访问的字段。例如,安全表有user_id,prop_1,prop_2列,其中prop_1,prop_2是数据库
ActiveRecord安全supportsdup,但mongoid似乎无法正确处理它。我想做以下事情:x=MyModel.newx.savey=x.dupy.savey应该是一个全新的对象,这样:x!=yx.id!=y.id 最佳答案 试试这个:x=Item.newx.savey=x.cloney.save它应该更改_id并复制所有其他字段。我注意到这似乎不适用于嵌入式文档。对于原始文档中的每个嵌入式文档,它会在克隆中使用新ID创建一个空白的嵌入式文档,但不会填充任何其他字段。如果使用嵌入式文档,最好编写自己的类方法。
我正在开发一个包含大约10个不同功能组件的Sinatra应用程序。我们希望能够将这些组件混合并匹配到应用程序的单独实例中,完全从config.yaml文件配置,如下所示:components:-route:'/chunky'component_type:FoodListercomponent_settings:food_type:baconmax_items:400-route:'places/paris'component_type:Mappercomponent_settings:latitude:48.85387273165654longitude:2.340087890625-