草庐IT

kafka—消费者

王博1999 2023-08-15 原文

学习目录

一、消费者工作流程

消费者Consumer采用从broker中主动拉取数据,Kafka采用这种方式

  1. 生产者Producer向每一个分区的leader发送数据,follower主动跟leader同步数据保证数据的可靠性
  2. 消费者Consumer消费某一个分区的数据,一个消费者可以消费多个分区的数据
  3. 每个分区的数据只能有一个消费者组中的一个消费者消费,即同一个分区不能有消费者组中的两个消费者同时消费
  4. 每个消费者的offset(分区中数据的偏移量),由消费者保存在主题中。如果某台消费者宕机了(挂了)重启的之后通过offset得到以前消费数据的位置

二、消费者组

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同

特点:

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
  • 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息
  • 消费者组之间互不影响。所有的消费者都属于某个消费者 组,即消费者组是逻辑上的一个订阅者

1.消费者组初始化流程

coordinator:辅助实现消费者组的初始化和分区的分配
coordinator节点选择 = groupid的hashcode值 % __consumer_offsets的分区数量
例如: groupid的hashcode值 = 1,__consumer_offsets为50,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset

  1. 所有者的消费者都会主动的向消费者发送请求加入消费者组当中
  2. coordinator从多个消费者中选择一个消费者作为leader(老大)
  3. coordinator将之前所有消费者信息发给leader,其中包括主题情况等
  4. 消费者leader会负责制度消费方案
  5. 消费者leader把消费方案发给coordinator
  6. coordinator把消费者方案分别发给各个消费者consumer

2.特殊情况☆☆☆☆☆

每3秒 每个消费者都会和coordinator保持心跳(默认3s),一旦超时超过45s(session.timeout.ms=45s),该消费者会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms5分钟)超过了五分钟,也会触发再平衡

再平衡:把挂掉的消费者的任务分配给其他消费者

3.消费者组详细消费流程

  1. 首先创建一个消费者网络连接的客户端去跟kafka集群进行交互,然后调用sendFetches方法用来抓取数据,进行初始化,需要设置以下参数
    (1)参数一:Fetch.min.bytes 每批次最小抓取数据大小 默认1字节,可以设置
    (2)参数二:fetch.max.wait.ms 每批数据未到最小抓取数据的大小的超时时间,默认为500ms
    (3)参数三:Fetch.max.bytes 每批次最
    大抓取大小,默认50M
  2. 再调用send方法发送请求,通过onSuccess回调方法,把对应的数据拉去回来,将一批一批的数据 放入消息队列queue中
  3. 消费者从队列中拉去数据,Max.poll.records一次拉取数据返回消息的最大条数,默认500条
  4. 生产者对数据进行序列号,那么消费者则对数据进行反序列化,通过拦截器,最后进行数据处理

三、快速入门

在 IDEA 中编写生产者和消费者程序,生产者往主题为first3中发送数据,消费者从主题为first3中拉去数据

注意:运行程序之前,需要启动zk和kafka集群

生产者 CustomProducer01 类

package com.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * 异步发送,创建不带回调函数的API代码
 */
public class CustomProducer01 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //写两个节点是为了防止客户挂掉,另一个能够正常工作

        //指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1.创建kafka生成对象
        // <String,String> 表示 k的数据类型,和v的数据类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 2.发送数据
        for (int i = 0; i<5;i++){
            //第一个参数为生产者的主题名,第二个生产者生产的数据value。还有其他配置选项
            kafkaProducer.send(new ProducerRecord<String, String>("first3","kafka"));
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

消费者 CustomConsumer_01 类

package com.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @author wangbo
 * @version 1.0
 */

/**
 * 1. 启动集群的zk和kafka
 * 2. 运行CustomConsumer_01消费者消费数据
 * 3. 运行CustomProducer01生产者生产数据 注意主题要对上
 */

public class CustomConsumer_01 {
    public static void main(String[] args) {
        //配置
        Properties properties = new Properties();

        //连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop102:9092");    //多写一个,避免其中一台挂掉,保证数据的可靠性

        //反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //配置消费者组ID 可以任意起
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        //1.创建一个消费者 "","hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        //2.订阅主题 first3
        ArrayList<String> topics = new ArrayList<String>();
        topics.add("first3");
        kafkaConsumer.subscribe(topics);

        //3.消费数据
        while (true){
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//每1秒拉取一批数据

            //循环打印消费的数据 consumerRecords.for
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

有关kafka—消费者的更多相关文章

  1. ruby - 在 Ruby 中实现生产者消费者模式 - 2

    假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来

  2. kafka如何动态消费新增topic主题 - 2

    一、解决痛点使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件二、组件能力1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix为auto,因此只有auto前缀的topic,才会被组件动态监听。)2、应用输出日志,监听到新增auto.topic1,并初始化客户端(主题刷新间隔为10s)3、发新的消

  3. 手把手教你搭建SpringCloud Alibaba之生产者与消费者 - 2

      SpringCloudAlibaba全集文章目录:零、手把手教你搭建SpringCloudAlibaba项目一、手把手教你搭建SpringCloudAlibaba之生产者与消费者二、手把手教你搭建SpringCloudAlibaba之Nacos服务注册中心三、手把手教你搭建SpringCloudAlibaba之Nacos服务配置中心四、手把手教你搭建SpringCloudAlibaba之Nacos服务集群配置五、手把手教你搭建SpringCloudAlibaba之Nacos服务持久化配置六、手把手教你搭建SpringCloudAlibaba之Sentinel实现流量实时监控七、手把手教你搭

  4. Zookeeper、Nacos、Dubbo、Kafka之间的关系 - 2

    1.Zookeeper  Zookeeper是 ApacheHadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为Dubbo服务的注册中心,工业强度较高。  Zookeeper的功能主要是它的树形节点来实现的。当有数据变化的时候或者节点过期的时候,会通过事件触发通知对应的客户端数据变化了,然后客户端再请求zookeeper获取最新数据,采用push-pull来做数据更新。服务注册和消费信息直接存储在zk树形节点上,集群下采用过半机制保证服务节点间一致性。 2.Nacos  Nacos是 Alibaba 公司推出的开源工具,用于实现分布式系统的服务发现与配置管理。Nacos是Dub

  5. Spark Kafka流媒体 - 如何确定批次的末端 - 2

    我使用Kafka流媒体从KAFKA主题中消费。(KafkaDirect流)此主题中的数据每5分钟从另一个来源到达。现在,我需要处理每5分钟后到达的数据,并将其转换为SparkDataFrame。现在,流是数据的连续流。我的问题是,如何确定我已经完成了在Kafka主题中加载的第一组数据的阅读?(以便我可以将其转换为数据框架并开始我的工作)我知道我可以提及某个数字的批处理间隔(在JavastreamingContext中),但是即使那样,我也永远无法确定源将数据将数据推到主题的时间。欢迎任何建议。看答案如果我正确理解您的问题,您希望不创建批处理,直到阅读5分钟的所有数据。开箱即用的Spark不会提

  6. javascript - React 16.3 Context API——提供者/消费者问题 - 2

    我一直在React16.3.1ContextAPI上做一些实验。我遇到了一些我无法理解的事情。我希望我能得到你的帮助。注意:问题已经解决,但不是我要找的解决方案。让我们首先对同一文件Index.js中的多个组件进行实验。importReact,{Component,createContext}from'react';const{Provider,Consumer}=createContext();classAppProviderextendsComponent{state={name:'Superman',age:100};render(){constincreaseAge=()=>{

  7. 流批一体计算引擎-4-[Flink]消费kafka实时数据 - 2

    Python3.6.9Flink1.15.2消费KafakaTopicPyFlink基础应用之kafka通过PyFlink作业处理Kafka数据1环境准备1.1启动kafka(1)启动zookeeperzkServer.shstart(2)启动kafkacd/usr/local/kafka/nohup./bin/kafka-server-start.sh./config/server.properties>>/tmp/kafkaoutput.log2>&1&或者./bin/kafka-server-start.sh-daemon./config/server0.properties(3)查看进

  8. javascript - Node 是否有办法消费/停止事件? - 2

    我正在寻找一种添加事件的方法,以便它们按顺序触发并可选通过。我想知道NodeAPI中是否有类似这样的东西,或者如果没有,是否有人知道一个像样的npm包可以完成这个:obj.on('event-A',function(){//logsomething()//consumeorstoptheevent}).on('event-A',function(){//thisneverfires}); 最佳答案 我刚刚编写了一个库(event-chains),它复制了EventEmitterAPI,并通过拒绝promise或调用this.stop

  9. 互联网快讯:小红书启动最严医美治理;极米投影产品受消费者肯定;小米手机小爱同学新增上课模式 - 2

    互联网快讯:小红书启动最严医美治理;极米投影产品受消费者肯定;小米手机小爱同学新增上课模式国内要闻浙江出招:大学生如果创业失败,贷款10万以下的由政府代偿;国家药监局:50批次不合格化妆品被立案调查;小红书启动最严医美治理;1月人民币全球支付份额升至3.2%创新高,保持全球第四;浙江除杭州外全面放开专科以上学历落户限制;国际油价突破每桶100美元,国内加满一箱油多花8元;江苏省委省政府成立“丰县生育八孩女子”事件调查组,彻查真相;科技通信1、中国科学副研究员蒋顺兴等与临沂大学等单位合作对中国鲲鹏翼龙的两件标本开展研究,依据食团中的食物残留,推测鲲鹏翼龙在不同年龄阶段都主要以燕辽生物群中的一种古

  10. javascript - RxJS:具有中止功能的生产者-消费者 - 2

    我在RxJS中遇到了一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,通常必须等待生产者。这可以通过压缩生产者和请求流来实现:varproduce=getProduceStream();varrequest=getRequestStream();varconsume=Rx.Observable.zipArray(produce,request).pluck(0);有时请求会被中止。生成的元素应该只在未中止的请求后使用:produce:-------------p1-------------------------p2--------->request:--r1---

随机推荐