草庐IT

kafka-consumer

全部标签

KafKa基本原理

简介分布式,分区,多副本,zk协调的分布式消息系统使用场景日志收集消息系统用户活动跟踪运营指标对数据安全要求不高的场景核心组成部分brokertopicproducerconsumerconsumergrouppartition原理通信基于tcp协议很多集群信息记录在zk里保证自己的无状态,方便水平扩容leader处理partition的读写请求,followers被动复制leader,不提供读写保证多副本与消费一致性一个partition同一个时刻在一个consumergroup中只能有一个consumerinstance在消费Controller本身是broker分区leader副本故障选

Kafka报错under-replicated partitions

1under-replicatedpartitions异常原因Kafka报错underreplicatedpartitions意味着某些分区的副本数量未达到预期的复制因子。主要有两种原因,Broker故障如果某个KafkaBroker发生故障,导致其中一些分区的副本不再可用,那么这些分区就会被标记为"under-replicated"副本分配策略在Kafka集群中,副本分配策略(replicaassignment)可能导致某些分区的副本分布不均衡。例如,如果你添加或删除了Broker,并且未正确调整副本分配策略,就可能导致副本分布不均匀,从而产生"under-replicatedpartiti

Kafka简单入门02——ISR机制

目录ISR机制ISR关键概念HW和LEOJava使用Kafka通信Kafka生产者示例Kafka消费者示例ISR机制Kafka中的ISR(In-SyncReplicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。ISR关键概念领导者和追随者:每个分区有一个领导者和零个或多个追随者。领导者负责处理客户端的写请求,而追随者主要用于数据复制。ISR集合:ISR集合是分区领导者的一组追随者副本,它们与领导者保持数据同步。只有在ISR集合中的追随者副本可以参与数据的写入和读取操作。数

wsl kafka的简单应用

安装并配置单机版kafka所需环境wsl2环境可用性较高,如下介绍在该环境中安装单机版本kafka的详细过程。启动命令行工具启动wsl:wsl--userroot--cd~,(以root用户启动,进入wsl后当前路径为~“用户主目录”)安装java:进入:https://learn.microsoft.com/zh-cn/java/openjdk/download,选择相应的java版本,下载接口创建java的安装路径:mkdir-p/opt/sdk/java将刚刚下载的javasdk压缩包移动进创建的路径:mv/mnt/c/Users/你的用户名/Downloads/microsoft-jd

关于flink重新提交任务,重复消费kafka的坑

异常现象1按照以下方式设置backend目录和checkpoint目录,fsbackend目录有数据,checkpoint目录没数据env.getCheckpointConfig().setCheckpointStorage(PropUtils.getValueStr(Constant.ENV_FLINK_CHECKPOINT_PATH));env.setStateBackend(newFsStateBackend(PropUtils.getValueStr(Constant.ENV_FLINK_STATEBACKEND_PATH)));原因我以为checkpoint和fsbackend要同时

jmeter简单压测kafka

前言      这也是一个笔记,就是计划用jmeter做性能测试,但是这里是只要将数据放到kafka的topic里,后面查看下游业务处理能力。一、方案      因为只要实现数据放到kafka,参考了下博友的方案,可行。二、方案验证      详细过程就不重复写了。直接上博友的链接吧。1.方案一https://blog.csdn.net/shan286/article/details/1052163812.方案二https://blog.csdn.net/jwcxs_m/article/details/103530869      个人认为方案二简单些。总结      就是笔记,自己备忘,也希

大数据基础设施搭建 - Kafka(with ZooKeeper)

文章目录一、简介二、单机部署2.1上传压缩包2.2解压压缩包2.3修改配置文件(1)配置zookeeper地址(2)修改kafka运行日志(数据)存储路径2.4配置环境变量2.5启动/关闭2.6测试(1)查看当前服务器中的所有topic(2)创建topic等增删改查操作未测试,担心后面升级为集群模式时出问题。三、集群部署3.0清空log.dirs目录并删除zookeeper的kafka节点3.1同步到其他机器(1)同步Kafka软件(2)修改其他机器的broker.id(3)配置其他机器的环境变量3.2启动/停止集群3.3测试(1)查看当前服务器中的所有topic(2)创建topic(3)删除

kafka中消息key作用与分区规则关系

在kafka2.0.0的javasdk中  org.apache.kafka kafka_2.12 2.0.0ProducerRecord中类注释如下Akey/valuepairtobesenttoKafka.Thisconsistsofatopicnametowhichtherecordisbeingsent,anoptionalpartitionnumber,andanoptionalkeyandvalue.Ifavalidpartitionnumberisspecifiedthatpartitionwillbeusedwhensendingtherecord.Ifnopartitioni

【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

【Flink-Kafka-To-RocketMQ】使用Flink自定义Sink消费Kafka数据写入RocketMQ1)准备环境2)代码实现2.1.主程序2.2.conf2.2.1.ConfigTools2.3.utils2.3.1.DBConn2.3.2.CommonUtils2.4.function2.4.1.MqSinkFunction2.5.resources2.5.1.appconfig.yml2.5.2.log4j.properties2.5.3.log4j2.xml1)准备环境这里的maven依赖比较冗余,推荐大家都加上,后面陆续优化。projectxmlns="http://m

MQ - KAFKA 高级篇

kafak是一个分布式流处理平台,提供消息持久化,基于发布-订阅的方式的消息中间件,同时通过消费端配置相同的groupId支持点对点通信。##适用场景:构造实时流数据管道,用于系统或应用之间可靠的消息传输.数据采集及处理,例如连接到一个数据库系统,捕捉表的变更内容.构建实时流式应用程序,对这些流数据进行转换或者影响,如:应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换.应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;异步处理:多应用对消息队列中同一消息进行处理,应用间