草庐IT

kafka保证数据有序性小结

学习爪哇 2023-04-08 原文

最近,项目中使用过kafka但是不太理解,然后各种搜博客补习。然后对kafka如何保证数据的有序性很感兴趣,于是乎,又疯狂找博客学习,现在可以说是小有心得,在这里记录一下,怕忘记。也作为给大家的一个分享。本文内容为集多家之长,根据自己的理解就诞生了这篇内容,开始。

 

自己在学习的过程中,看完博客结合自己理解的小结如下:

研究如何保障kafka消费的顺序性,宗旨就是通过将消息绑定到定向的分区或者队列来保证顺序性,通过增加分区或者线程来提升消费能力。

   1.要保证生产者发消息发的是顺序性的消息,这个好解决,发消息的时候指定一下key相同的key会发送到一个分区中,而分区时有序的在发消息的时候多个操作(下单,支付)保证顺序的话,保证这些操作在一个topic下的同一个分区,topic好指定,分区也好指定发送的时候指定key相同的key会去到同一个分区中,kafkaTemplate.send(topic,partition,key,data)有参数可以指定key。意思就是保证发消息的时候有序,发到同一个分区中,那么消费的时候也是有序的,因为分区可以理解为一个队列,消息先进先出。

2.单线程有序的话,发的时候有序你消费的时候直接消费也是有序的,但是这样当消息量很大的时候,单线程消费的肯定很慢,这个时候你只能扩展分区,扩展消费者(这个时候就是指定分区消费了),效率会高一些,但是高到一定程度的时候你还想扩展只能加机器了,做集群,这样成本就提升了

     3.第三种就是老板不让加机器,要从技术层面解决问题,那只能开多线程消费了,生产者还是按照key发消息不变,消费者拿到消息之后开个多线程进行消费,虽然是拿到消息的时候是有序的,但是线程有快慢,线程处理的速度不同,会导致有序性打乱了。这时候想办法解决这个问题,模仿kafka分区的方法,消费者拿到消息之后,消费之前把消息按照key的hash值取模,放到队列中,这个时候保证了队列中的消息有序

然后,让线程池消费队列中的消息即可。但是第三种没有跑通,还得思考。8.1号跑通了,搜了其他的文章,知道了是那个类实现了ApplicationRunner接口,重写public void run(ApplicationArguments args) {方法,在这个带参数的run方法中,开了两个线程去执行消费的任务,作用就是初始化内容的,实在applicaion的main方法执行完之后要立即执行这个ApplicationRunner接口的实现方法的run方法。在这个方法中可以初始化你想提前加载的信息。

理论知识结束,上代码:

实现顺序性原理:消息发送的时候保证有序,设置相同的key会把消息投递到同一个分区的topic中,再由一个消费者来消费该分区topic

topic: "topic_query_p3r1" 分配了三个partition分区

producer 投递顺序消息

   同一组行为设置相同的key,会把这组数据投递到同一分区topic中。

/**
     * 投递顺序性消息,根据用户id做取模推送到不同分区的topic中
     * 相同的key推送到同一分区中
	*  第一个参数:topic
	* 第二个参数:key
	* 第三个参数:发送的消息内容
	* 三个参数全部是sring类型
     */
    @RequestMapping("/kafka2")
    public String testKafka2() {
        for (int userId = 0; userId < 300; userId++) {
            kafkaTemplate.send("topic_query_p3r1", userId + "", "insert" + userId);
            kafkaTemplate.send("topic_query_p3r1", userId + "", "update" + userId);
            kafkaTemplate.send("topic_query_p3r1", userId + "", "delete" + userId);
        }
        return null;
    }

consumer 消费顺序消息

前提是生产者发消息的时候指定key了

方式1 - 直接进行消费

   因为投递的相同行为的消息是有序的,所以直接消费也不会有问题。

/**
     * 消费topic_query_p3r1主题,ConsumerGroupId1消费组
     */
    @KafkaListener(topics = "topic_query_p3r1", groupId = "ConsumerGroupId1")
    public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer) throws InterruptedException {
        System.out.println("消费者A topic名称:" + consumer.topic() +
                ", key:" + consumer.key() +
                ", value:" + consumer.value() +
                ", 分区位置:" + consumer.partition() +
                ", 下标" + consumer.offset()+"    "+Thread.currentThread().getId());
        Thread.sleep(10);
}

方式2.1 - 一个消费者来指定具体分区进行消费

   指定具体分区来进行消费。

 /**
     * 消费者,解决消息顺序性
     * 注解参数:partitions=0表示:只消费该主题中0分区的数据。
     */
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"0"})}, groupId = "ConsumerGroupId1")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("消费者C topic名称:" + consumer.topic() +
                ",key:" + consumer.key() + "," +
                ",value:" + consumer.value() + "," +
                "分区位置:" + consumer.partition() +
                ", 下标" + consumer.offset());
    }

方式2.2 - 多个消费者来指定不同分区进行消费。

   写多个消费者方法来分别指向不同分区,提高消费速度,但是此方法不灵活。

/**
     * 消费0分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组
     */
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"0"})}, groupId = "ConsumerGroupId1")
    public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer) throws InterruptedException {
        System.out.println("消费者A topic名称:" + consumer.topic() +
                ", key:" + consumer.key() +
                ", value:" + consumer.value() +
                ", 分区位置:" + consumer.partition() +
                ", 下标" + consumer.offset()+"    "+Thread.currentThread().getId());
        Thread.sleep(10);
    }
    /**
     * 消费1分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组
     */
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"1"})}, groupId = "ConsumerGroupId1")
    public void p3r2ConsumerGroupId1(ConsumerRecord<?, ?> consumer) throws InterruptedException {
        System.out.println("消费者A topic名称:" + consumer.topic() +
                ", key:" + consumer.key() +
                ", value:" + consumer.value() +
                ", 分区位置:" + consumer.partition() +
                ", 下标" + consumer.offset()+"    "+Thread.currentThread().getId());
        Thread.sleep(10);
    }
    /**
     * 消费2分区的topic_query_p3r1主题消费者,ConsumerGroupId1消费组
     */
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_query_p3r1", partitions = {"2"})}, groupId = "ConsumerGroupId1")
    public void p3r2ConsumerGroupId2(ConsumerRecord<?, ?> consumer) throws InterruptedException {
        System.out.println("消费者A topic名称:" + consumer.topic() +
                ", key:" + consumer.key() +
                ", value:" + consumer.value() +
                ", 分区位置:" + consumer.partition() +
                ", 下标" + consumer.offset()+"    "+Thread.currentThread().getId());
        Thread.sleep(10);
    }

多线程顺序消费

可以看出图都是拿别人的,但是这张图很棒,描述的很清楚

 这段代码把我坑死了,原文是没有implements ApplicationRunner {这段代码的直接开始内容,我弄到我本地根本跑不起来,还报错,最后多方查,看到重写的

run(ApplicationArguments args) { 这个方法带参数,才反推出来时实现了ApplicationRunner 接口,不过兄弟们,这个雷我踩过了,放心食用吧。
@RestController
@Slf4j
public class ShunXuConsumerMoreThread implements ApplicationRunner {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    // 使用两个内存队列
    final int queueLingth = 2;

    // 创建两个内存队列
    Queue<Map> queueA = new ConcurrentLinkedQueue<>();
    Queue<Map> queueB = new ConcurrentLinkedQueue<>();

    /**
     * 投递顺序性消息,根据用户id做取模推送到不同分区的topic中
     * 相同的key推送到相同的分区中
     */
    @RequestMapping("/kafka2")
    public String testKafka2() {
        for (int userId = 0; userId < 300; userId++) {
            kafkaTemplate.send("topic_query_2", userId + "", "insert" + userId);
            kafkaTemplate.send("topic_query_2", userId + "", "update" + userId);
            kafkaTemplate.send("topic_query_2", userId + "", "delete" + userId);
        }
        return null;
    }

    /**
     * 主题消费者-把相同行为的数据放到同一内存队列中
     */
    @KafkaListener(topics = "topic_query_2", groupId = "ConsumerGroupId1")
    public void p3r2ConsumerGroupId0(ConsumerRecord<?, ?> consumer){
        // 1.封装消息参数
        Map param = new HashMap();
        param.put("topic", consumer.topic());
        param.put("key", consumer.key());
        param.put("value", consumer.value());
        param.put("p", consumer.partition());

        // 2.把相同行为(key)数据添加到同一内存队列中
        int queueHash = consumer.key().hashCode() % queueLingth;
        if (queueHash == 0) {
            queueA.add(param);
        }
        if (queueHash == 1) {
            queueB.add(param);
        }
    }

    // 开启两个线程消费内存队列中的消息 ApplicationRunner接口常用于项目启动后,(也就是ApringApplication.run()执行结束),立马执行某些逻辑。
    //这里是立即启动两个线程
    @Override
    public void run(ApplicationArguments args) {
        new Thread() {
            @Override
            public void run() {
                while (true) {
                    if (queueA.size() > 0) {
                        Map poll = queueA.poll();
                        //业务逻辑
                        System.out.println("Thrend-Id: " + Thread.currentThread().getId() +
                                "  topic:" + poll.get("topic") +
                                "  key:" + poll.get("key") +
                                "  value:" + poll.get("value") +
                                "  partition:" + poll.get("p"));

                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }.start();

        new Thread() {
            @Override
            public void run() {
                while (true) {
                    if (queueB.size() > 0) {
                        Map poll = queueB.poll();
                        //业务逻辑
                        System.out.println("Thrend-Id: " + Thread.currentThread().getId() + "  topic:" + poll.get("topic") + "  key:" + poll.get("key") + "  value:" + poll.get("value") + "  partition:" + poll.get("p"));

                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }.start();
    }
}

打印:insertupdatedelete都是有序的。相同行为都在同一线程下执行。

消费是按照顺序的,正常!!!

参考博客:kafka顺序性投递,顺序性消费代码_祁_z的博客-CSDN博客_kafka顺序消费代码 

有关kafka保证数据有序性小结的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  3. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  4. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  5. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  6. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  7. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

  8. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

  9. SPI接收数据异常问题总结 - 2

    SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手

  10. 微信小程序通过字典表匹配对应数据 - 2

    前言一般来说,前端根据后台返回code码展示对应内容只需要在前台判断code值展示对应的内容即可,但要是匹配的code码比较多或者多个页面用到时,为了便于后期维护,后台就会使用字典表让前端匹配,下面我将在微信小程序中通过wxs的方法实现这个操作。为什么要使用wxs?{{method(a,b)}}可以看到,上述代码是一个调用方法传值的操作,在vue中很常见,多用于数据之间的转换,但由于微信小程序诸多限制的原因,你并不能优雅的这样操作,可能有人会说,为什么不用if判断实现呢?但是if判断的局限性在于如果存在数据量过大时,大量重复性操作和if判断会让你的代码显得异常冗余。wxswxs相当于是一个独立

随机推荐