草庐IT

Kafka中Zookeeper的作用

一只慵懒的猫z 2023-04-11 原文

一、Kafka架构图

上篇博客主要讲了Kafka的基本概念,这里不做过多介绍。可以参考 Kafka入门学习

那么Zookeeper在其中主要起到什么作用呢?


二、 Zookeeper 在 kafka 中的具体作用

它是一个分布式协调框架。很好的将消息生产、消息存储、消息消费的过程结合在一起。在典型的Kafka集群中, Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息.

2.1 Broker注册

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册中心对整个集群的Broker进行管理,此时就使用了Zookeeper。在Zookeeper上会有一个专门用来记录Broker服务器列表的节点:/brokers/ids

每个Broker在启动时,都会在Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0…N]。

Kafka使用了全局唯一的数字来指代每个Broker服务器,不同的Broker必须使用不同的Broker ID进行注册,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。这样,我们就可以很方便的监控到Broker节点的变化,及时调整负载均衡等。

2.2 Topic注册

在kafka中,用户可以自定义多个topic,每个topic又被划分为多个分区,每个分区存储在一个独立的broker上。这些分区信息及与Broker的对应关系都是由Zookeeper进行维护

在zookeeper中,建立专门的节点来记录这些信息,其节点路径为/brokers/topics/{topic_name}。并且topic创建的节点类型也是临时节点

2.3 生产者负载均衡

同一个Topic消息会被分区并将其分布在多个Broker上。由于每个Broker启动时,都会在Zookeeper上进行注册,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡。

2.4 消费者负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。
每个消费者都需要关注所属消费者分组中其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。还对Broker服务器变化注册监听。消费者需要对/broker/ids/[0-N]中的节点进行监听,如果发现Broker服务器列表发生变化,那么就根据具体情况来决定是否需要进行消费者负载均衡。

2.5 分区与消费者的关系

消费者组 Consumer group 下有多个 Consumer(消费者)。
对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID。
在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:
/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]
其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该消息分区上消费者的Consumer ID。

2.6 记录消息消费的进度Offset

在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

2.7 消费者注册

注册新的消费者分组

当新的消费者组注册到zookeeper中时,zookeeper会创建专用的节点来保存相关信息,其节点路径为 /consumers/{group_id},其节点下有三个子节点,分别为[ids, owners, offsets]。

ids节点:记录该消费组中当前正在消费的消费者;

owners节点:记录该消费组消费的topic信息;

offsets节点:记录每个topic的每个分区的offset;

注册新的消费者

当新的消费者注册到zookeeper中时,会在/consumers/{group_id}/ids节点下创建临时子节点,并记录相关信息

有关Kafka中Zookeeper的作用的更多相关文章

  1. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  2. ruby-on-rails - "assigns"在 Ruby on Rails 中有什么作用? - 2

    我目前正在尝试学习RubyonRails和测试框架RSpec。assigns在此RSpec测试中做什么?describe"GETindex"doit"assignsallmymodelas@mymodel"domymodel=Factory(:mymodel)get:indexassigns(:mymodels).shouldeq([mymodel])endend 最佳答案 assigns只是检查您在Controller中设置的实例变量的值。这里检查@mymodels。 关于ruby-o

  3. ruby - 字符串文字前面的 * 在 ruby​​ 中有什么作用? - 2

    这段代码似乎创建了一个范围从a到z的数组,但我不明白*的作用。有人可以解释一下吗?[*"a".."z"] 最佳答案 它叫做splatoperator.SplattinganLvalueAmaximumofonelvaluemaybesplattedinwhichcaseitisassignedanArrayconsistingoftheremainingrvaluesthatlackcorrespondinglvalues.Iftherightmostlvalueissplattedthenitconsumesallrvaluesw

  4. ruby - 为什么这个 eval 在 Ruby 中不起作用 - 2

    你能解释一下吗?我想评估来自两个不同来源的值和计算。一个消息来源为我提供了以下信息(以编程方式):'a=2'第二个来源给了我这个表达式来评估:'a+3'这个有效:a=2eval'a+3'这也有效:eval'a=2;a+3'但我真正需要的是这个,但它不起作用:eval'a=2'eval'a+3'我想了解其中的区别,以及如何使最后一个选项起作用。感谢您的帮助。 最佳答案 您可以创建一个Binding,并将相同的绑定(bind)与每个eval相关联调用:1.9.3p194:008>b=binding=>#1.9.3p194:009>eva

  5. ruby-on-rails - Spring 不起作用。 [未初始化常量 Spring::SID::DL] - 2

    我无法运行Spring。这是错误日志。myid-no-MacBook-Pro:myid$spring/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/lib/spring/sid.rb:17:in`fiddle_func':uninitializedconstantSpring::SID::DL(NameError)from/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/li

  6. ruby-on-rails - Simple_form 必填字段不起作用 - Ruby on Rails - 2

    我在RoR应用程序中有一个提交表单,是使用simple_form构建的。当字段为空白时,应用程序仍会继续下一步,而不会提示错误或警告。默认情况下,这些字段应该是required:true;但即使手动编写也行不通。该应用有3个步骤:NewPost(新View)->Preview(创建View)->Post。我的Controller和View的摘录会更清楚:defnew@post=Post.newenddefcreate@post=Post.new(params.require(:post).permit(:title,:category_id))ifparams[:previewButt

  7. ruby-on-rails - Heroku Action 缓存似乎不起作用 - 2

    我一直在Heroku上尝试不同的缓存策略,并添加了他们的memcached附加组件,目的是为我的应用程序添加Action缓存。但是,当我在我当前的应用程序上查看Rails.cache.stats时(安装了memcached并使用dalligem),在执行应该缓存的操作后,我得到current和total_items为0。在Controller的顶部,我想缓存我有的Action:caches_action:show此外,我修改了我的环境配置(对于在Heroku上运行的配置)config.cache_store=:dalli_store我是否可以查看其他一些统计数据,看看它是否有效或我做错

  8. ruby-on-rails - Rake 预览在 Octopress 中不起作用 - 2

    我在我的机器上安装了ruby​​版本1.9.3,并且正在为我的个人网站开发一个octopress项目。我为我的gems使用了rvm,并遵循了octopress.org记录的所有步骤。但是我在我的rake服务器中发现了一些错误。这是我的命令日志。Tin-Aung-Linn:octopresstal$ruby--versionruby1.9.3p448(2013-06-27revision41675)[x86_64-darwin12.4.0]Tin-Aung-Linn:octopresstal$rakegenerate##GeneratingSitewithJekyllidenticals

  9. ruby - 比较运算符不起作用(在 erb View 中) - 2

    我是RubyonRails的新手,我正在尝试编写一个morethan表达式:5%>大于号不断抛出异常捕获错误。我不确定如何解决这个问题?编辑:这不是rails,也不是View,它是一个Ruby构造 最佳答案 使用5%>错误来自photo_limit而不是从Integer延伸类(猜测它真的是一个字符串),因此没有混合比较方法/s有关更多信息,请参阅:http://www.skorks.com/2009/09/ruby-equality-and-object-comparison/特别是你必须混入Comparable并定义方法。虽然在这

  10. Ruby 全局作用域 - 2

    在回答另一个问题时,我意识到下面的程序并没有完全按照我的想法去做。puts"test"self.puts"test"#=>privatemethod`puts'calledformain:Object(NoMethodError)异常让我感到惊讶,因为我一直认为顶级方法调用将由main对象实例解决,但事实似乎并非如此。谁是第一个电话的实际接收者?如何解决?这是仅适用于顶级范围内的方法调用的特殊规则吗? 最佳答案 这是一个gooddiscussion说的就是这个问题。顶级方法,由Kernel提供,自动包含在Object类中。这意味着内

随机推荐