草庐IT

Kafka-eagle

全部标签

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表

使用finksql方式将mysql数据同步到kafka中,每次只能同步一张表packageflink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.TableResult;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;publicclassFlinkSQL_CDC{publicstaticvoidmain(String[]args)th

【flink sql】kafka连接器

Kafka连接器提供从Kafkatopic中消费和写入数据的能力。前面已经介绍了flinksql创建表的语法及说明:【flinksql】创建表这篇博客聊聊怎么通过flinksql连接kafka创建kafka表示例CREATETABLEKafkaTable(`user_id`BIGINT,`item_id`BIGINT,`behavior`STRING,`ts`TIMESTAMP(3)METADATAFROM'timestamp')WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='ma

Kafka之常用参数配置整理

Kafka之常用参数配置整理一、Broker端参数1、存储2、Zookeeper3、连接4、Topic管理5、数据留存二、Topic级别参数三、JVM参数四、操作系统参数一、Broker端参数1、存储log.dirs:指定broker使用的若干个文件目录路径。(无默认值,必须指定)log.dir:配置单个路径,用于上个参数的补充。通常情况下,我们只需要设置log.dirs即可。而且建议配置多个路径,比如:/home/kafka1,/home/kafka2,/home/kafka3。并且,如果条件允许,最好将这些目录挂载到不同的物理磁盘。这样做有两个好处:提升读写性能。多块物理磁盘同时读写数据具

Kafka系列之broker-list,bootstrap-server以及zookeeper

broker-listbroker:kafka服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。broker-list:指定集群中的一个或多个服务器,一般在使用kafka-console-producer.sh的时候,这个参数是必备参数,另外一个必备的参数是topic。bootstrap-serversvszookeeperbootstrap-servers指的是目标集群的服务器地址,这和broker-list功能一样,不过在consoleproducer要求用broker-list。1、以前用consoleconsumer测试消息收发时会

4.3、Flink任务怎样读取Kafka中的数据

目录1、添加pom依赖2、API使用说明3、这是一个完整的入门案例4、Kafka消息应该如何解析4.1、只获取Kafka消息的value部分​4.2、获取完整Kafka消息(key、value、Metadata)4.3、自定义Kafka消息解析器5、起始消费位点应该如何设置​5.1、earliest()5.2、latest()5.3、timestamp()6、Kafka分区扩容了,该怎么办——动态分区检查7、在加载KafkaSource时提取事件时间&添加水位线7.1、使用内置的单调递增的水位线生成器+kafka timestamp为事件时间7.2、使用内置的单调递增的水位线生成器+kafka

springboot kafka 实现延时队列

好文推荐:2.5万字详解23种设计模式基于Netty搭建websocket集群实现服务器消息推送2.5万字讲解DDD领域驱动设计文章目录一、延时队列定义二、应用场景三、技术实现方案:1.Redis2.Kafka3.RabbitMQ4.RocketMQ四、Kafka延时队列背景五、Kafka延时队列实现思路六、Kafka延时队列架构图七、kafka延时任务代码实现1.KafkaSyncConsumer:Kafka消费者2.KafkaDelayQueue:Kafka延迟队列3.KafkaDelayQueueFactory:Kafka延迟队列工厂4.KafkaPollListener:Kafka延迟

Springboot Kafka 集成配置

Springboot配置使用Kafka前言一、Linux安装Kafka二、构建项目三、引入依赖四、配置文件生产者yml方式Config方式消费者yml方式Config方式五、开始写代码生产者发送成功回调和异常处理消费者接收异常处理七、开始测试测试普通单条消息测试消费者异常处理测试延时消息测试批量消息测试手动控制消费者监听总结前言不多BB讲原理,只教你怎么用,看了全网没有比我更详细的了,yml配置,Config工厂代码配置都有,batch-size、acks、offset、auto-commit、trusted-packages、poll-timeout、linger应有尽有,批量消费、开启事务

Kafka3.0.0版本——Follower故障处理细节原理

目录一、服务器信息二、服务器基本信息及相关概念2.1、服务器基本信息2.2、LEO的概念2.3、HW的概念三、Follower故障处理细节一、服务器信息三台服务器原始服务器名称原始服务器ip节点centos7虚拟机1192.168.136.27broker0centos7虚拟机2192.168.136.28broker1centos7虚拟机3192.168.136.29broker2二、服务器基本信息及相关概念2.1、服务器基本信息首先,分别有3台服务器分别为broker0、broker1、broker2,其中一台为leader,2台follower服务器。每台服务器已经接收到数据,如下图所示

【Kafka超时问题(已解决),kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection w】

Kafka超时问题(已解决),kafka.zookeeper.ZooKeeperClientTimeoutException:Timedoutwaitingforconnectionwhileinstate:CONNECTING一、报错内容及原因二、解决办法三、其他解决方案参考四、Kafka启动命令一、报错内容及原因1.Kafka正常运行一段时间后,用测试工具发送数据时报了错:2.于是单独用黑窗口启动Kafka,看看具体报啥错:3.原因JDK版本和Kafka版本不匹配。(我手里项目较多,用的JDK版本也不一样,有的时候忘了把JDK切换回去,就报了这个错)。我的Kafka版本:二、解决办法更换J

kafka3.4.0集群搭建(无zookeeper)

注意:低版本需要安装zookeeper,在2.8及以上可移除zookeeper前往官网下载!!!不要下载src文件1.解压文件tarxzfkafka_2.13-3.4.0.tgz进入文件cdkafka_2.13-3.4.0进入cdconfig/kraft2.修改server.properties以下属性vimserver.properties3.我们需要在启动服务器之前创建kafka集群id。执行下列命令,并记下运行生成的uuid:./bin/kafka-storage.shrandom-uuidZ_aPqvh6SPyxIIHEo9Os8A!!!只需要一个集群ID4.接下来我们格式化所有存储目