草庐IT

Kafka-eagle

全部标签

java - 如何通过 Java 在 Kafka 中创建主题

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符中创建一个主题,并且如果我通过javaapi推送消息,它工作正常。但我想通过javaapi创建一个主题。经过长时间的搜索,我找到了下面的代码,ZkClientzkClient=newZkClient("localhost:2181",10000,10000);AdminUtils.createTopic(zkClient,myTopic,10,1,newProperties());我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息。我的代码有什么问题吗?或者任何其他方式来

java - 如何通过 Java 在 Kafka 中创建主题

我想通过java在Kafka(kafka_2.8.0-0.8.1.1)中创建一个主题。如果我在命令提示符中创建一个主题,并且如果我通过javaapi推送消息,它工作正常。但我想通过javaapi创建一个主题。经过长时间的搜索,我找到了下面的代码,ZkClientzkClient=newZkClient("localhost:2181",10000,10000);AdminUtils.createTopic(zkClient,myTopic,10,1,newProperties());我尝试了上面的代码,它显示主题已创建,但我无法在主题中推送消息。我的代码有什么问题吗?或者任何其他方式来

【运维笔记】kafka跨域通信代理

kafka跨域通信代理场景描述模拟思路模拟环境说明基础环境kafka版本环境部署基础软件安装编写kafka的docker-compose.yml文件环境验证解决方案Kafka通信机制解决思路代理配置验证是否满足要求在项目部署过程中遇到kafka需要走代理跨域通信的情景,搭建了一套环境模拟实验,以此记录。场景描述两套kafka集群KafkaS和KafkaC分别位于两个不互通的网络域,跨域互访需要经过nginx代理机,现需要确认nginx、kafka的配置。模拟思路利用docker在一台虚拟机上创建两个网络不互通的kafka集群,同时在宿主机上部署nginx。模拟环境说明基础环境1、宿主机:Ubu

【面试】Kafka面试题

文章目录1、Kafka是什么?2、partition的数据文件(offffset,MessageSize,data)3、数据文件分段segment(顺序读写、分段命令、二分查找)4、负载均衡(partition会均衡分布到不同broker上)5、批量发送6、压缩(GZIP或Snappy)7、消费者设计8、ConsumerGroup9、如何获取topic主题的列表10、生产者和消费者的命令行是什么?11、consumer是推还是拉?12、讲讲kafka维护消费状态跟踪的方法13、讲一下主从同步。14、为什么需要消息系统,mysql不能满足需求吗?15、Zookeeper对于Kafka的作用是什么

SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

  最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC采集PostgreSQL变更数据发布到Kafka。一、业务价值监听数据变化,进行异步通知,做系统内异步任务。架构方案(懒得写了,看图吧):二、修改数据库配置2.1、更改配置文件postgresql.conf#更改wal日志方式为logical(必须)wal_level=logical#minimal,replica,orlogical#更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots(每个文档都这么说,但根据我

SpringBoot集成Flink-CDC 采集PostgreSQL变更数据发布到Kafka

  最近做的一个项目,使用的是pg数据库,公司没有成熟的DCD组件,为了实现数据变更消息发布的功能,我使用SpringBoot集成Flink-CDC采集PostgreSQL变更数据发布到Kafka。一、业务价值监听数据变化,进行异步通知,做系统内异步任务。架构方案(懒得写了,看图吧):二、修改数据库配置2.1、更改配置文件postgresql.conf#更改wal日志方式为logical(必须)wal_level=logical#minimal,replica,orlogical#更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots(每个文档都这么说,但根据我

Kafka:消费者消费失败处理-重试队列

kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实现消息重试的功能。实现创建新的kafka主题作为重试队列:创建一个topic作为重试topic,用于接收等待重试的消息。普通topic消费者设置待重试消息的下一个重试topic。从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic同一个消息重试次数过多则不再重试 代码实现 依赖 org.springframework.bootspring-boot-starter-data-redisorg.sp

【Kafka】微服务学习笔记九:什么是消息中间件&Kafka的介绍及使用

 个人简介: >📦个人主页:赵四司机>🏆学习方向:JAVA后端开发 >📣种一棵树最好的时间是十年前,其次是现在!>⏰往期文章:SpringBoot项目整合微信支付>🧡喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。前言:1.前面基于Springboot的单体项目介绍已经完结了,至于项目中的其他功能实现我这里就不打算介绍了,因为涉及的知识点不难,而且都是简单的CRUD操作,假如有兴趣的话可以私信我我再看看要不要写几篇文章做个介绍。2.完成上一阶段的学习,我就投入到了微服务的学习当中,所用教程为B站上面黑马的微服务教程。由于我的记性不是很好,所以对于新事物的学习我比较喜欢做笔记以加强理解,在这里

配置Kafka消息保留时间

生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置)。本文探讨对Kafk主题配置消息保留时间。基于时间保留通过保留期属性,消息就有了TTL(timetolive生存时间)。到期后,消息被标记为删除,从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。接下来我们将学习如何通过代理配置属性进行调整,以设置新主题的保留周期,并通过主题级配置在运行时控制它。服务器级配置ApacheKafka支持服务器级配置保留策略,我们可以通过配置以下三个基于时间的配置属性中的一个来进行优化:log.retention

三、Kafka消费者

目录1.消费者与消费者组2.客户端开发2.1配置必要的参数2.2订阅主题与分区2.3反序列化2.4消息消费2.5位移提交2.6控制或关闭消费2.7指定位移消费2.8再均衡2.9消费者拦截器2.10多线程实现2.11重要的消费者参数1.消费者与消费者组消费者(Consumer)负责订阅Kafka主题,并从订阅的主题上拉取消息。与其它消息中间件不同的是:Kafka消费理念中还有一层消费者组(ConsumerGroup)的概念,一个消费者组包含多个消费者,消息发布到主题后,会投递给每个消费者组中的其中一个消费者。如上图,一个主题有四个分区,P0、P1、P2、P3。两个消费者组A、B都订阅了这个主题,