草庐IT

Kafka学习笔记

肯定不吃番茄啊 2023-04-13 原文

基础概念

Broker: 与RobbitMq类似,一个Kafka消息中间件节点就是一个Broker,一个或者多个Broker可以组成一个Kafka集群。
Producer: 消息的生产者,向Broker发送消息的客户端。
Topic: 主题,是每个消息的分类。每个发送到Kafka的消息都必须指明一个Topic,消费者消费时也必须指明要消费哪个Topic。
Partition: 分区。一个Topic可以有多个分区,分区内部的消息是有序的。
Consumer: 消息的消费者,向Broker拉取消息的客户端。
ConsumerGroup: 消费组。每个消费者进行消费时必须以消费组的形式进行消费,即使消费组里只有一个消费者。一条消息可以被多个ConsumerGroup消费,但是一个ConsumerGroup里只能有一个Consumer消费该消息。

Topic和消息日志log

对于每一个Topic,可以同时分为多个partition日志文件:

partition存储着message,内部是有序的。message到达partition时会按顺序添加到commit log中,每条message都有一个唯一的offset。
每个partition都有一个commit log,一个partition中的message的offset是唯一的,但是在不同partition的message的offset有可能是相同的。
kafka一般不会删除消息,不管消息有没有被消费。kafka一般是通过配置在配置文件里的日志保留时间(log.retention.hours),决定日志的最长保留时间,一般默认保留一周。
kafka的性能跟日志文件大小没有关系,所以保留大量的日志消息不会对kafka的性能产生什么影响。

consumer进行消费时是基于维护在commit log的offset实现的。在kafka中,consumer会自己维护offset,消息完一条消息offser就+1,或者通过指定对应的offser或者时间offset进行跳过某几条消息或者重复消费某几条消息。
这意味着consumer的消费情况都由consumer本身进行维护,对于kafka的性能消耗是非常小的,增加或者减少consumer都不会对kafka集群或者consumer产生影响,因为offser是consumer各自维护的。

partition

一般情况下,partitions都分布在不同的broker下,每个broker可以请求备份其他broker下的partition下的数据。
同时,也可以通过配置,指定每个partition的副本数量。对于每个partition,都有一个broker起到leader作用,follower的数量可以是0个或者多个。并且只有leader才能接收针对这个partition消息的读写请求,其他follower只能被动的从leader同步数据,不提供读写。如果leader挂了,那么会选举其中一个follower成为leader。

ISR: In-Sync Replicas (同步副本集)。指的是与leader保持数据同步的副本的集合。当ledaer挂时,会从ISR中选出一个当folloer。如果ISR中的broker没有及时从ledaer中同步数据,就会从ISR中剔除。

消息传递模式

传统的消息中间件消息传递模式有2种,对于kafka的实现模式:

  • Queue:多个consumer同时拉取消息,一条消息只能被一个consumer消费。
    将所有的consumer配置在一个ConsumerGroup下。
  • Publish-Subscribe模式:所有订阅该Topic的consumer都能收到消息
    将所有的consumer都分配一个独立的ConsumerGroup。

kafka设计原理

核心总控制器 Controller

当kafka集群启动时,它会从集群中的多个broker选出一个broker作为controller,负责管理整个集群中所有分区和副本的状态:

  • 当某个topic下的某个分区的leader故障,controller负责选举出新的leader;
  • 当某个分区的ISR集合中的数据发生变化时,controller负责将相关的元信息同步到所有的broker中;
  • 当某个topic的分区数量增加时,controller负责将该新分区被其他的broker感知到。
Controller选举和工作机制:

整个Controller的选举和工作机制都是基于Zookeeper实现的。
选举机制: 当kafka集群启动时,所有的broker都会尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证只有一个broker节点创建成功,创建成功的broker节点就成为了核心控制器Controller。
其他没有成功创建 /controller 的broker会监听这个临时节点,当充当controller角色的borker节点宕机,那么 /controller 就会销毁,其他监听的broker会再次争抢创建临时节点,最后成功创建的成为了controller,这就是选举机制。

工作机制:

  • 监听broker: 监听 zookeeper 中的 /brokers/ids/节点,处理broker数量的变化;
  • 监听topic: 监听zookeeper中的 /brokers/topics,处理topics数量的变化;
  • 监听partition: 监听zookeeper中的 /brokers/topics/[topic],处理partition数量的变化;
  • 从zookeeper获取所有与brokers,topics和partition相关得到消息并进行管理。

Partition副本选举Leader机制

如果某个partition的leader挂了,那么controller会从ISR列表中选举出leader。kafka会优先挑选ISR列表中挑选第一个broker作为leader。因为一个leader最先放入ISR,可能同步的数据最多。如果配置了 unclean.leader.election.enable = true ,那么也可以从ISR以外的broker中挑选出leader。
follower进入ISR的条件:

  • follower不能产生网络分区,必须与leader保持网络联通并且与zookeeper保持会话;
  • follower能同步leader的所有写操作,至少不能落后太多。落后的多少是由 replica.lag.time.max.ms 决定的,如果超过这个时间未与leader进行同步,则会被剔除ISR。

consumer的offset记录机制

每个consumer会定期将各自的offset发送到 kafka 的内部 topic : _consumer_offsets,对应的 key 是 consumerGroupId + topic + 分区号,value 就是当前 offset 的值。consumer宕机后重新启动也是根据该key获取对应的offset。kafka 会定期清理这部分数据,只保留最新的。
从这里可以看出,_consumer_offsets可能会有很高的并发量,所以默认给其50个分区(可通过offsets.topic.num.partitions配置)。

消费者rebalance机制

如果某个topic内consumerGroup里的consumer数量或者partition数量发生了变化,这时会触发 kafka 的 rebalance机制。例如某个consumer宕机,此时会将它所负责的partition分给其他消费者,当它状态恢复正常后,又会将他负责的partition还给他。
rebalance机制只会在subscribe这种不指定分区的情况下触发,如果是assign这种指定分区的情况下不会触发。
以下几种情况会触发rebalance:

  • consumerGroup 里的 consumer 数量增加或者减少;
  • topic增加了分区;
  • 消费组订阅了更多topic。
    需要注意的是,处于rebalance的kafka集群,consumer无法从kafaka中消费消息,所以应当选择在流量较低的时间触发执行rebalance。
    Rebalance策略
    range策略:例如有10个broker,3个consumer,第一个consumer消费 1-4,第二个消费 5-7,第三个消费 8-10;
    round-robin:轮询策略;
    sticky:与round-robin类似,但是rebalance时会尽量不去破坏原有的分配规则。

HW与LEO

LEO: log-end-offset 指的是副本中的日志文件下一条消息的offset,这个offset值目前是没有消息的。
HW:HighWatermark (高水位)。取得是partition中 ISR最小的LEO值。
consumer最多只能消费到 HW 的位置。另外,在每个副本集中,leader 和 follower 都需要维护自己的 HW 状态。当 producer 发送消息到leader 时,consumer不能立即消费到这条消息。leader 首先更新自己的LEO,然后 follower 从 leader 中同步后也更新自己的LEO值,等到所以的 follower 都更新完毕,那么副本集中的 HW 就会更新,此时消费者才能消费到这条消息。这么做的好处是:如果 leader 宕机,consumer 依然可以消费到这条消息。

日志分段存储

在 kafka 中,日志是以分段的形式存储的。一个 partition 中的所有的消息都放在一个以 topic + 分区号 命名的文件夹下存储。消息存储的格式如下:

00000000000000000000.index
00000000000000000000.log
00000000000000000000.timeindex

00000000000005367851.index
00000000000005367851.log
00000000000005367851.timeindex

00000000000009936472.index
00000000000009936472.log
00000000000009936472.timeindex

以 “.index” 为结尾的文件,文件名就代表这段日志文件中起始offset的位置;
以 “.timeindex” 为结尾的文件,kafka每发送 4k 大小的消息到分区,就会记录一条当前消息的时间戳与对应的offset位置;
以 “.log” 为结尾的文件,存放者具体的消息,包括 offset 和消息体。
kafka 中默认每个日志段大小是 1G(可通过log.segment.bytes配置)。

kafka的重要参数与问题总结

一些比较重要的参数:

producer
  • acks:
    (1) acks = 0。表示 producer不需要等待任何 broker 确认收到消息的回复,就可以发送下一条消息。这种性能最高,但是最容易丢消息;
    (2) acks = 1。表示 producer 需要等待 leader 确认将消息写入 log 的回复,但是不需要等待所有的 follower 都成功写入,就可以继续发送下一条消息。这种情况下,follower 还未及时同步消息,leader 又宕机了,那么就会丢消息。
    (2) acks = -1/all。表示 producer 需要等待 leader 和 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数的配置的副本个数成功写入log的回复,才能发送下一条消息。这种策略会保证只要有一个 follower 存活就不会丢失数据。
  • retries:消息发送失败时的重试次数。重试能保证消息发送的可靠性,但可能会导致消息重复发送。
  • retry.backoff.ms:消息发送失败时的重试间隔,默认是100ms。
  • buffer.memory:消息发送的本地缓冲区,如果设置了该值,那么消息会先发送到缓冲区中,在一定程度上可以提高性能。默认值是33554432,即32MB。
  • batch.size: kafka本地线程会从缓冲去取数据,批量发送到broker。该值可以设置批量发送数据的大小,默认是16384,即16kb。
  • linger.ms: 默认值是0,表示消息立即必须被发送。消息发送完后会放入一个本地的batch,如果设置了该值,在指定的时间内达到 batch.size 的大小,那么就会随着这个batch发送出去;如果没未达到,这条消息也会被发送出去。目的是不让消息发送的延时时间太长。
  • key.serializer:指定key从字符串序列化到字节数组的序列化器。
  • value.serializer:指定value从字符串序列化到字节数组的序列化器。
consumer
  • group.id: 消费者所属的消费组id
  • enable.auto.commit:是否由kafka自动提交offset。
  • auto.offset.reset:当消费主题的是一个新的消费组;或者指定offset的消费方式,但是offset不存在,消费者如何消费。
    (1)latest(默认): 只消费自己启动之后发送到主题的消息。
    (2)earliest: 第一次从头开始消费,以后按照offset 的记录继续消费。
  • heartbeat.interval.ms: 消费者发送到broker的心跳间隔时间。需要注意的是,如果发生了rebalance ,那么broker会通过心跳响应将rebalance方案。
  • session.timeout.ms:broker多长没有收到consumer的心跳消息就将consumer从消费组踢出,默认是10秒。
  • max.poll.records:一次性从consumer拉取消息的最大条数。
  • max.poll.interval.ms:如果两次拉取消息的间隔时间超过这个时间,那么broker认为这个consumer处理能力太弱,将该consumer提出消费组。
  • key.deserializer:指定key的反序列化器。
  • value.serializer:指定value的反序列化器。

问题及优化方案

1. 消息丢失:

发送端:
(1) acks = 0。表示 producer不需要等待任何 broker 确认收到消息的回复,就可以发送下一条消息。这种性能最高,但是最容易丢消息;
(2) acks = 1。表示 producer 需要等待 leader 确认将消息写入 log 的回复,但是不需要等待所有的 follower 都成功写入,就可以继续发送下一条消息。这种情况下,follower 还未及时同步消息,leader 又宕机了,那么就会丢消息。
(2) acks = -1/all。表示 producer 需要等待 leader 和 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数的配置的副本个数成功写入log的回复,才能发送下一条消息。这种策略会保证只要有一个 follower 存活就不会丢失数据。

消费端:
将 auto.offset.reset 设置为 false,不自动提交offset。只有当业务代码成功执行完毕,再提交offser,避免消息丢失。

2. 消息重复消费

发送端: 如果配置了重试机制,可能因为网络原因会造成消息的重复提交。
消费端: 消费端如果是手动提交offset,可能出现业务代码执行完毕,但offset还没提交,下一次拉取就会消费重复的数据。
解决办法是,消费端的消费接口保证幂等性

3.消息乱序

如果在消息发送端配置超时机制,那么就有可能出现,发送了1,2,3条消息,但是第一条出现网络问题导致落后于后面两条消息,从而导致消息到达broker后的顺序变成了2,3,1。
解决办法:
(1). 将发送消息的发送方式改成同步,保证一条消息发送成功后再发送下一条;或者直接关闭重试机制。这两种方案都会导致发送端的性能降低。
(2). 如果想要保证消息全链路的消费有序,需要保证有序的消息全部发送到topic的一个分区内,并且只能有一个consumer进行消费。或者消费者接收到消息后,将需要保证有序的消息放入到内存队列里,再开启一个线程去消费。

4.消息积压

(1) 由于发送端发送速率过快,或者消费端消费能力太低,导致消息挤压。
这个时候可以启动新的消费者,把旧的消费者暂时关掉。新的消费者程序不处理具体业务,而是将挤压的消息转发到其他新的topic下,这个topic可以设置很多分区,然后启动多台消费者去消费新的topic。
(2) 由于消息格式有误或者消费程序出现bug,导致消费者消费消息一直不成功。
可以将这些消费不成功的消息取出放入数据库或者磁盘中(类似死信队列),后面再慢慢分析消费失败的原因。

5.延时队列

实现方案:
发送端将消息按照不同的延迟时间发送到不同的topic下,如 topic_1s,topic_5s,topics_1min… ,然后开启定时器去轮询这些topic下的消息,如果时间到了就将这些消息转发到具体业务处理的topic下。

kafka高性能的原因

  1. kafka 写日志时每次都是在文件末尾追加,因为kafka的消息不能修改或者删除保证了磁盘的顺序读,不会出现文件的随机写的情况;
  2. 数据传输的零拷贝;
  3. 消息发送和消费的批量处理和压缩传输。

零拷贝:

  1. 减少了内核到用户空间,用户空间到内核的两次拷贝;
  2. 减少了内核与用户空间的上下文切换。

有关Kafka学习笔记的更多相关文章

  1. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  2. CAN协议的学习与理解 - 2

    最近在学习CAN,记录一下,也供大家参考交流。推荐几个我觉得很好的CAN学习,本文也是在看了他们的好文之后做的笔记首先是瑞萨的CAN入门,真的通透;秀!靠这篇我竟然2天理解了CAN协议!实战STM32F4CAN!原文链接:https://blog.csdn.net/XiaoXiaoPengBo/article/details/116206252CAN详解(小白教程)原文链接:https://blog.csdn.net/xwwwj/article/details/105372234一篇易懂的CAN通讯协议指南1一篇易懂的CAN通讯协议指南1-知乎(zhihu.com)视频推荐CAN总线个人知识总

  3. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  4. ruby - 我正在学习编程并选择了 Ruby。我应该升级到 Ruby 1.9 吗? - 2

    我完全不是程序员,正在学习使用Ruby和Rails框架进行编程。我目前正在使用Ruby1.8.7和Rails3.0.3,但我想知道我是否应该升级到Ruby1.9,因为我真的没有任何升级的“遗留”成本。缺点是什么?我是否会遇到与普通gem的兼容性问题,或者甚至其他我不太了解甚至无法预料的问题? 最佳答案 你应该升级。不要坚持从1.8.7开始。如果您发现不支持1.9.2的gem,请避免使用它们(因为它们很可能不被维护)。如果您对gem是否兼容1.9.2有任何疑问,您可以在以下位置查看:http://www.railsplugins.or

  5. ruby - 我如何学习 ruby​​ 的正则表达式? - 2

    如何学习ruby​​的正则表达式?(对于假人) 最佳答案 http://www.rubular.com/在Ruby中使用正则表达式时是一个很棒的工具,因为它可以立即将结果可视化。 关于ruby-我如何学习ruby​​的正则表达式?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/1881231/

  6. 深度学习12. CNN经典网络 VGG16 - 2

    深度学习12.CNN经典网络VGG16一、简介1.VGG来源2.VGG分类3.不同模型的参数数量4.3x3卷积核的好处5.关于学习率调度6.批归一化二、VGG16层分析1.层划分2.参数展开过程图解3.参数传递示例4.VGG16各层参数数量三、代码分析1.VGG16模型定义2.训练3.测试一、简介1.VGG来源VGG(VisualGeometryGroup)是一个视觉几何组在2014年提出的深度卷积神经网络架构。VGG在2014年ImageNet图像分类竞赛亚军,定位竞赛冠军;VGG网络采用连续的小卷积核(3x3)和池化层构建深度神经网络,网络深度可以达到16层或19层,其中VGG16和VGG

  7. 机器学习——时间序列ARIMA模型(四):自相关函数ACF和偏自相关函数PACF用于判断ARIMA模型中p、q参数取值 - 2

    文章目录1、自相关函数ACF2、偏自相关函数PACF3、ARIMA(p,d,q)的阶数判断4、代码实现1、引入所需依赖2、数据读取与处理3、一阶差分与绘图4、ACF5、PACF1、自相关函数ACF自相关函数反映了同一序列在不同时序的取值之间的相关性。公式:ACF(k)=ρk=Cov(yt,yt−k)Var(yt)ACF(k)=\rho_{k}=\frac{Cov(y_{t},y_{t-k})}{Var(y_{t})}ACF(k)=ρk​=Var(yt​)Cov(yt​,yt−k​)​其中分子用于求协方差矩阵,分母用于计算样本方差。求出的ACF值为[-1,1]。但对于一个平稳的AR模型,求出其滞

  8. Unity Shader 学习笔记(5)Shader变体、Shader属性定义技巧、自定义材质面板 - 2

    写在之前Shader变体、Shader属性定义技巧、自定义材质面板,这三个知识点任何一个单拿出来都是一套知识体系,不能一概而论,本文章目的在于将学习和实际工作中遇见的问题进行总结,类似于网络笔记之用,方便后续回顾查看,如有以偏概全、不祥不尽之处,还望海涵。1、Shader变体先看一段代码......Properties{ [KeywordEnum(on,off)]USL_USE_COL("IsUseColorMixTex?",int)=0 [Toggle(IS_RED_ON)]_IsRed("IsRed?",int)=0}......//中间省略,后续会有完整代码 #pragmamulti_c

  9. Tcl脚本入门笔记详解(一) - 2

    TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是

  10. ruby-on-rails - 这个 C 和 PHP 程序员如何学习 Ruby 和 Rails? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我来自C、php和bash背景,很容易学习,因为它们都有相同的C结构,我可以将其与我已经知道的联系起来。然后2年前我学了Python并且学得很好,Python对我来说比Ruby更容易学。然后从去年开始,我一直在尝试学习Ruby,然后是Rails,我承认,直到现在我还是学不会,讽刺的是那些打着简单易学的烙印,但是对于我这样一个老练的程序员来说,我只是无法将它

随机推荐