我刚刚启动并运行了Kafka0.8beta1。我有一个非常简单的示例启动并运行,问题是,我只能让一个消息消费者工作,而不是几个。也就是说,runSingleWorker()方法有效。run()方法不起作用:importkafka.consumer.ConsumerIterator;importkafka.consumer.KafkaStream;importkafka.consumer.ConsumerConfig;importkafka.javaapi.consumer.ConsumerConnector;importjava.util.Map;importjava.util.Lis
我有一个消费者worker应用程序,它在内部启动X个线程,每个线程都生成它的KafkaCosnumer。Cosnumers具有相同的groupId并且订阅了相同的主题。因此,每个消费者都能公平地分配分区。处理的本质是我不能丢失消息,也不能允许重复。我运行的kafka版本是0.10.2.1。这是我面临的问题:消费者线程1开始消费消息,并且在poll()上获取了一批消息。我还实现了ConsumerRebalanceListener,这样每次成功处理消息时,它都会被添加到offsets映射中。(见下面的代码。)因此,一旦重新平衡发生,我可以在我的分区重新分配给其他消费者之前提交我的偏移量。有
我正在学习Kafka,我想知道当我消费来自主题的消息时如何指定然后分区。我找到了几张这样的图片:这意味着一个消费者可以消费来自多个分区的消息,但一个分区只能由单个消费者(在消费者组内)读取。此外,我已经阅读了几个消费者示例,它们看起来像这样:Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("group.id","consumer-tutorial");props.put("key.deserializer",StringDeserializer.class
我们有不同的消费者群体,可以消耗来自不同主题的数据。我们对该主题有不同的分区。我们需要允许某些消费者群体可以访问特定主题,以便只有这些组才能从主题中阅读。KAFKA访问控制列表:bin/kafka-acls.sh--authorizerkafka.security.auth.SimpleAclAuthorizer--authorizer-propertieszookeeper.connect=localhost:2181--add--allow-principalUser:Bob--consumer--topictest-topic--consumer-groupGroup-1但是,据我们了解
我很确定bootstrap.servers是正确的。是不是Maven有什么冲突或者Kafka有什么问题??在此之前它运行成功。我添加了一些Maven或Spark然后出了点问题..谁知道怎么解决?这是java中的kafka代码Propertiesprops=newProperties();props.put("bootstrap.servers","x.xx.xxx.xxx:9092");props.put("metadata.broker.list","x.xx.xxx.xxx:9091,x.xx.xxx.xxx:9092,x.xx.xxx.xxx:9093");props.put("
我是kafka的初学者我们正在寻找我们的kafka集群(一个5节点集群)的大小,以处理17,000个事件/秒,每个事件的大小为600字节。我们计划复制3个事件并将事件保留一周我在kafka的文档页面看了assumingyouwanttobeabletobufferfor30secondsandcomputeyourmemoryneedaswrite_throughput*30.那么这到底是怎么写的呢?如果它是每秒MB的数量-我正在查看9960MB/Secifconsiderthatasmywritethroughputthenthememorycalculatesas292GB(996
我有以下结构:zookeeper:3.4.12kafka:kafka_2.11-1.1.0server1:zookeeper+kafkaserver2:zookeeper+kafkaserver3:zookeeper+kafka通过kafka-topicsshell脚本创建了复制因子为3且分区为3的主题。./kafka-topics.sh--create--zookeeperlocalhost:2181--topictest-flow--partitions3--replication-factor3并使用localConsumers组。当领导没问题时,它工作正常。./kafka-to
我有以下结构:zookeeper:3.4.12kafka:kafka_2.11-1.1.0server1:zookeeper+kafkaserver2:zookeeper+kafkaserver3:zookeeper+kafka通过kafka-topicsshell脚本创建了复制因子为3且分区为3的主题。./kafka-topics.sh--create--zookeeperlocalhost:2181--topictest-flow--partitions3--replication-factor3并使用localConsumers组。当领导没问题时,它工作正常。./kafka-to
我有以下flume代理配置来从kafka源读取消息并将它们写回HDFS接收器tier1.sources=source1tier1.channels=channel1tier1.sinks=sink1tier1.sources.source1.type=org.apache.flume.source.kafka.KafkaSourcetier1.sources.source1.zookeeperConnect=192.168.0.100:2181tier1.sources.source1.topic=testtier1.sources.source1.groupId=flumetier1
Kafka中有40个主题和编写的SparkStreaming作业,每个主题处理5个表。sparkstreaming作业的唯一目标是读取5个kafka主题并将其写入相应的5个hdfs路径。大多数时候它工作正常,但有时它会将主题1数据写入其他hdfs路径。下面的代码试图归档一个sparkstreaming作业来处理5个主题并将其写入相应的hdfs,但是这个将主题1数据写入HDFS5而不是HDFS1。请提供您的建议:importjava.text.SimpleDateFormatimportorg.apache.kafka.common.serialization.StringDeseria