这里写目录标题SpringBoot集成Kafka1、构建项目1.1、引入依赖1.2、application.yml配置1.3、简单生产1.4、简单消费2、生产者2.1、带回调的生产者2.2、监听器2.3、自定义分区器2.4、事务提交3、消费者3.1、指定topic、partition、offset消费3.2、异常处理3.3、消息过滤器3.4、消息转发3.5、定时启动、停止3.6、手动确认消息4、配置详解4.1、生产者yml方式4.2、生产者Config方式4.3、消费者yml方式4.4、消费者Config方式5、注解消费示例5.1、简单消费5.2、监听多个主题5.3、监听一个主题,指定分区消费
场景说明100万日活,每人每天100条日志,每天总共的日志条数是100万*100条=1亿条(中型公司偏小)。1亿/24小时/60分/60秒 = 1150条/每秒钟。每条日志大小:0.5k-2k(取1k)(甚至更多根据实际情况)。1150条/每秒钟*1k≈1m/s。高峰期(中午小高峰下午8-12):1m/s * 20倍 =20m/s 最大40m/s。每秒多少数据量:20MB/s。 服务器台数选择 服务器台数= 2 *(生产者峰值生产速率*副本/100)+ 1 = 2 *(20m/s * 2 / 100)+ 1 = 3台建议3台服务器
作者:禅与计算机程序设计艺术1.简介ApacheKafka是一个分布式流平台,主要应用于构建实时数据管道和流处理应用程序。Kafka是开源项目,由LinkedIn开源。它提供了高吞吐量、低延迟、可扩展性和容错能力。由于其内置了集群管理功能,因此可以方便地实现横向扩展。 本文档从以下方面详细介绍Kafka: 1)概述:包括ApacheKafka的定义、特性和优点;2)安装配置:介绍如何安装及配置Kafka,并介绍相关命令及工具;3)核心概念:介绍Kafka的核心概念,包括主题(Topic)、分区(Partition)、消息(Message)等;4)生产者和消费者:介绍如何通过生产者向Kafk
kafka配置类用途:定义使用的基本kafka配置,以及定义Bean下面文件是读取本地spring的标准配置文件的类,用于一般属性获取等操作importlombok.Data;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.common.serialization.StringDeserializer;importorg.springframework.boot.context.properties.ConfigurationProperties;importorg.springfram
kafka详解二1、offset1.1offset介绍老版本Consumer的位移管理是依托于ApacheZooKeeper的,它会自动或手动地将位移数据提交到ZooKeeper中保存。当Consumer重启后,它能自动从ZooKeeper中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得KafkaBroker不需要保存位移数据,减少了Broker端需要持有的状态空间,因而有利于实现高伸缩性。新版本Consumer的位移管理机制其实也很简单,就是**将Consumer的位移数据作为一条条普通的Kafka消息,提交到__consumer_offsets中。可以这么说,__consum
文章目录1.Kafka消费者是什么?2.Kafka消费者组的概念?3.Kafka消费者和消费者组有什么关系?4.Kafka多个消费者如何同时消费一个分区?1.Kafka消费者是什么?消费者负责订阅Kafka中的主题,并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。2.Kafka消费者组的概念?假设我们有一个应用程序,它从一个Kafka主题读取消息,在对这些消息做一些验证后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息
docker-compose安装带kafka-ui的kafka集群在日常的工作当中,kafka集群作为常用的中间件,其搭建过程略显繁琐,需要配置的文件颇多,为了方便各位初学者快速体验kafka的魅力,本文采取一键式安装kafka-3.3.1(不带zookeeper版本)的集群化安装。仅需将下面配置中的10.0.0.147改为自己本机的ip即可version:"3"services:#kafka可视化工具kafka-ui:container_name:kafka-uiimage:provectuslabs/kafka-ui:latestports:-8989:8080depends_on:-ka
排除步骤1:使用group_id=”consumer_group_id_001“和 auto_offset_reset="earliest"fromkafkaimportKafkaConsumerconsumer=KafkaConsumer(bootstrap_servers=["dev-kafka01.test.xxx.cloud:9092"],enable_auto_commit=True,auto_commit_interval_ms=5000,group_id="test-consumer-group",auto_offset_reset="earliest")consumer.sub
一、服务端接收消费者拉取数据的方法二、遍历请求中需要拉取数据的主题分区集合,分别执行查询数据操作,1、会选择合适的副本读取本地日志数据(2.4版本后支持主题分区多副本下的读写分离)三、会判断当前请求是主题分区Follower发送的拉取数据请求还是消费者客户端拉取数据请求1、拉取数据之前首先要得到leaderIsrUpdateLock的读锁2、readFromLocalLog读取本地日志数据四、读取日志数据就是读取的segment文件1、获取当前本地日志的基础数据(高水位线,偏移量等),2、遍历segment,直到从segment读取到数据五、创建带有读取指定文件位置通道的文件记录对象FileR
1、安装1.1下载与安装kafka下载地址:ApacheKafka需要说明的是,kafka的安装依赖于zk,zk的部署可直接参考《Zookeeper介绍与基本部署》。当然,kafka默认也内置了zk的启动脚本,在kafka安装路径的bin目录下,名称为zookeeper-server-start.sh,如果不想独立安装zk,可直接使用该脚本。1.2配置kafkakafka的配置文件在/app/www/kafka/config/server.properties中broker.id=0listeners=PLAINTEXT://10.0.2.5:9092num.network.threads=3