背景当flink消费kafka的消息时,我们经常会用到FlinkKafkaConsumer进行水位线的发送,本文就从源码看下FlinkKafkaConsumer.assignTimestampsAndWatermarks指定周期性水位线发送的流程FlinkKafkaConsumer水位线发送1.首先从Fetcher类开始,创建Fetcher类的时候会构建一个周期性的水位线发送线程并启动//ifwehaveperiodicwatermarks,kickofftheintervalschedulerif(timestampWatermarkMode==WITH_WATERMARK_GENERATO
背景FlinkKafkaConsumer支持当收到某个kafka分区中的某条记录时发送水位线,比如这条特殊的记录代表一个完整记录的结束等,本文就来解析下发送punctuated水位线的源码punctuated水位线发送源码解析1.首先KafkaFetcher中的runFetchLoop方法publicvoidrunFetchLoop()throwsException{try{//kickofftheactualKafkaconsumerconsumerThread.start();while(running){//thisblocksuntilwegetthenextrecords//itau
我正在使用flink从kafka读取并写入redis。为了测试,我只想读取来自kafka的前10条消息。所以我使用计数器并尝试在counter=10时停止消费者AtomicIntegercounter=newAtomicInteger(0);FlinkKafkaConsumer08kafkaConsumer=newFlinkKafkaConsumer08("mytopic",newSimpleStringSchema(){@OverridepublicbooleanisEndOfStream(StringnextElement){//Itshouldonlyread10kafkames
我正在使用flink从kafka读取并写入redis。为了测试,我只想读取来自kafka的前10条消息。所以我使用计数器并尝试在counter=10时停止消费者AtomicIntegercounter=newAtomicInteger(0);FlinkKafkaConsumer08kafkaConsumer=newFlinkKafkaConsumer08("mytopic",newSimpleStringSchema(){@OverridepublicbooleanisEndOfStream(StringnextElement){//Itshouldonlyread10kafkames