草庐IT

kafka-consumer

全部标签

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou

java - 如何找到哪个消费者分配给kafka中主题的哪个分区?

我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给了消费者组中的哪个消费者。假设有消费者组group-A消费主题topic-A,n个分区,那么在group-A托管在不同的VM中。那么如何找到哪个分区分配给哪个消费者主机呢?在kafka0.9.1中可以吗?提前致谢。 最佳答案 您可以检查$KAFKA_HOME/bin/kafka-consumer-groups.sh的工作原理并将其实现集成到您的kafka管理器工具中,该工具将向您展示详细信息组所有者信息(例如,分区分配、滞后、IP)。小组主题分区CURRENT-OFFS

java - 如何使用 Java 中的结构化流从 Kafka 反序列化记录?

我使用Spark2.1。我正在尝试使用SparkStructuredStreaming从Kafka读取记录,反序列化它们并在之后应用聚合。我有以下代码:SparkSessionspark=SparkSession.builder().appName("Statistics").getOrCreate();Datasetdf=spark.readStream().format("kafka").option("kafka.bootstrap.servers",kafkaUri).option("subscribe","Statistics").option("startingOffset

java - 如何连接 Apache Kafka 和 Amazon S3?

我想使用KafkaConnect将来自Kafka的数据存储到存储桶s3中。我已经运行了一个Kafka的主题,并且创建了一个存储桶s3。我的主题有关于Protobuffer的数据,我尝试使用https://github.com/qubole/streamx我得到了下一个错误:[2018-10-0413:35:46,512]INFORevokingpreviouslyassignedpartitions[]forgroupconnect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)[

java - 在Java中创建之前检查kafka中是否存在主题

我正在尝试使用以下方法在kafka0.8.2中创建一个主题:AdminUtils.createTopic(zkClient,myTopic,2,1,properties);如果我在本地多次运行代码进行测试,则会失败,因为主题已经创建。有没有办法在创建主题之前检查主题是否存在?TopicCommandapi似乎没有为listTopics或describeTopic返回任何内容. 最佳答案 您可以使用kakfa-client版本0.11.0.0的AdminClient示例代码:Propertiesconfig=newProperties

java - 卡夫卡 : No message seen on console consumer after message sent by Java Producer

我是Kafka的新手。我在我的本地机器上创建了一个java生产者,并在网络上的另一台机器上设置了一个Kafka代理,比如M2(我可以ping、SSH、连接到这台机器)。在Eclipse控制台的生产者端,我收到“消息已发送”。但是当我检查机器M2上的控制台消费者时,我看不到这些消息。我的java生产者代码是:importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.pr

java - 亚马逊 SQS : The same message is consumed by two current consumers

我有四个当前消费者在AmazonAWS上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志:18:01:46,515[jmsContainer-2]DEBUG-从队列中收到消息:ID:3698a927-930b-4d6a-aeca-f6692252879218:02:12,825[jmsContainer-3]DEBUG-从队列中收到消息:ID:3698a927-930b-4d6a-aeca-f66922528792我有一个包含4个并发使用者的JMS容器设置。我将可见性超时设置为30秒。既然container2收到了消息,怎么conta

java - 在kafka中创建多少生产者?

在一个高容量的实时javaweb应用程序中,我正在向apachekafka发送消息.目前我正在向单个主题发送消息,但将来我可能需要向多个主题发送消息。在这种情况下,我不确定是否应该为每个主题创建一个制作人,还是应该为我的所有主题使用一个制作人?这是我的代码:props=newProperties();props.put("zk.connect",:,:,:);props.put("zk.connectiontimeout.ms","1000000");props.put("producer.type","async");Producerproducer=newkafka.javaapi

java - 是否有 Java-8 之前的功能接口(interface)可以替代 java.util.function.Consumer<T>?

为了迁移到Java8,我尝试以有利于使用lambda的方式编写我的代码。我需要一个功能接口(interface),该接口(interface)具有一个方法,该方法采用某种类型的一个参数T并返回void。这是java.util.function.Consumer的accept()方法的签名,但我当然还不能使用它。我可以使用标准Java7(最好是Java6)API中的另一个接口(interface)吗?我知道我可以创建自己的,但尤其是。在将此代码移植到Java8之前,如果我可以使用已经从标准Java6/7API中熟悉的标准接口(interface),那么可读性会更好。到目前为止我发现的最接

java - 使用 Kafka Streams 开发时,Lib 上的 UnsatisfiedLinkError 会影响 DB dll

我正在我的开发Windows机器上编写KafkaStreams应用程序。如果我尝试使用KafkaStreams的leftJoin和branch功能,在执行jar应用程序时会出现以下错误:Exceptioninthread"StreamThread-1"java.lang.UnsatisfiedLinkError:C:\Users\user\AppData\Local\Temp\librocksdbjni325337723194862275.dll:Can'tfinddependentlibrariesatjava.lang.ClassLoader$NativeLibrary.load(