草庐IT

FlinkKafkaConsumer

全部标签

源码解析FlinkKafkaConsumer支持周期性水位线发送

背景当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程FlinkKafkaConsumer水位线发送1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动//ifwehaveperiodicwatermarks,kickofftheintervalschedulerif(timestampWatermarkMode==WITH_WATERMARK_GENERATO

源码解析FlinkKafkaConsumer支持punctuated水位线发送

背景FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码punctuated水位线发送源码解析1.首先KafkaFetcher中的runFetchLoop方法publicvoidrunFetchLoop()throwsException{try{//kickofftheactualKafkaconsumerconsumerThread.start();while(running){//thisblocksuntilwegetthenextrecords//itau

java - 从kafka到redis的flink管道

我正在使用flink从kafka读取并写入redis。为了测试,我只想读取来自kafka的前10条消息。所以我使用计数器并尝试在counter=10时停止消费者AtomicIntegercounter=newAtomicInteger(0);FlinkKafkaConsumer08kafkaConsumer=newFlinkKafkaConsumer08("mytopic",newSimpleStringSchema(){@OverridepublicbooleanisEndOfStream(StringnextElement){//Itshouldonlyread10kafkames

java - 从kafka到redis的flink管道

我正在使用flink从kafka读取并写入redis。为了测试,我只想读取来自kafka的前10条消息。所以我使用计数器并尝试在counter=10时停止消费者AtomicIntegercounter=newAtomicInteger(0);FlinkKafkaConsumer08kafkaConsumer=newFlinkKafkaConsumer08("mytopic",newSimpleStringSchema(){@OverridepublicbooleanisEndOfStream(StringnextElement){//Itshouldonlyread10kafkames