下面是我的配置inputFromKafka经过下面的转换publicMessagetransform(finalMessagemessage){System.out.println("KAFKAMessageHeaders"+message.getHeaders());finalMap>>origData=(Map>>)message.getPayload();//somecodetofigure-outthenonPartitionedDatareturnMessageBuilder.withPayload(nonPartitionedData).build();}不管怎样,上面的打
我在AWS上的DC/OS(Mesos)集群上安装了Kafka。启用三个代理并创建一个名为“topic1”的主题。dcoskafkatopiccreatetopic1--partitions3--replication3然后我编写了一个Producer类来发送消息和一个Consumer类来接收它们。publicclassProducer{publicstaticvoidsendMessage(Stringmsg)throwsInterruptedException,ExecutionException{MapproducerConfig=newHashMap();System.out.p
我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}例如:1494002667893-client.message?chatName=1c&messageBody=hello1494002656558-chat.started?chatName=1c&chatPatricipants=3此外,我们为每条消息添加一些消息key,以将它们发送到相应的分区。我想做的是根据消息的{system-timestamp}部分并在1分钟的窗口内
我有一个包含java.time.LocalDateTime的基本POJO:packagefoo.bar.asire.api.model;importjava.time.LocalDateTime;publicclassAddress{privateLongid;privateIntegerhouseNumber;privateStringaddress;privateLocalDateTimecreated;publicAddress(){super();}publicAddress(Longid,IntegerhouseNumber,Stringaddress,LocalDateTi
新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou
我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给了消费者组中的哪个消费者。假设有消费者组group-A消费主题topic-A,n个分区,那么在group-A托管在不同的VM中。那么如何找到哪个分区分配给哪个消费者主机呢?在kafka0.9.1中可以吗?提前致谢。 最佳答案 您可以检查$KAFKA_HOME/bin/kafka-consumer-groups.sh的工作原理并将其实现集成到您的kafka管理器工具中,该工具将向您展示详细信息组所有者信息(例如,分区分配、滞后、IP)。小组主题分区CURRENT-OFFS
我使用Spark2.1。我正在尝试使用SparkStructuredStreaming从Kafka读取记录,反序列化它们并在之后应用聚合。我有以下代码:SparkSessionspark=SparkSession.builder().appName("Statistics").getOrCreate();Datasetdf=spark.readStream().format("kafka").option("kafka.bootstrap.servers",kafkaUri).option("subscribe","Statistics").option("startingOffset
我想使用KafkaConnect将来自Kafka的数据存储到存储桶s3中。我已经运行了一个Kafka的主题,并且创建了一个存储桶s3。我的主题有关于Protobuffer的数据,我尝试使用https://github.com/qubole/streamx我得到了下一个错误:[2018-10-0413:35:46,512]INFORevokingpreviouslyassignedpartitions[]forgroupconnect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)[
我正在尝试使用以下方法在kafka0.8.2中创建一个主题:AdminUtils.createTopic(zkClient,myTopic,2,1,properties);如果我在本地多次运行代码进行测试,则会失败,因为主题已经创建。有没有办法在创建主题之前检查主题是否存在?TopicCommandapi似乎没有为listTopics或describeTopic返回任何内容. 最佳答案 您可以使用kakfa-client版本0.11.0.0的AdminClient示例代码:Propertiesconfig=newProperties
在一个高容量的实时javaweb应用程序中,我正在向apachekafka发送消息.目前我正在向单个主题发送消息,但将来我可能需要向多个主题发送消息。在这种情况下,我不确定是否应该为每个主题创建一个制作人,还是应该为我的所有主题使用一个制作人?这是我的代码:props=newProperties();props.put("zk.connect",:,:,:);props.put("zk.connectiontimeout.ms","1000000");props.put("producer.type","async");Producerproducer=newkafka.javaapi