草庐IT

ConsumeKafka

全部标签

hadoop - 使用 ConsumeKafka 处理器时创建更大的 NiFi 流文件

我创建了一个简单的NiFi管道,它从Kafka主题读取数据流(使用ConsumeKafka)并将其写入HDFS(使用PutHDFS)。目前,我看到在HDFS上创建了许多小文件。大约每秒创建一个新文件,有些文件只有一条或两条记录。我希望将更少、更大的文件写入HDFS。我在ConsumeKafka中有以下设置:MessageDemarcator=MaxPollRecords=10000MaxUncommittedTime=20s过去我使用Flume而不是Nifi,它有batchSize和batchDurationMillis,这让我可以调整HDFS文件的大小。Nifi中的ConsumeKa