草庐IT

kafka-offset

全部标签

java - 为什么 Kafka Direct Stream 会为每条消息创建一个新的解码器?

我有一个用Java编写并使用Spark2.1的Spark流应用程序。我正在使用KafkaUtils.createDirectStream来读取来自Kafka的消息。我正在为kafka消息使用kryo编码器/解码器。我在Kafkaproperties->key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这个当Spark在微批中拉取消息时,使用kryo解码器成功解码消息。但是我注意到Spark执行程序创建了一个新的kryo解码器实例,用于解码从kafka读取的每条消息。我通过将日志放入解码器构造

java - 使用 Spring Boot 1.5 避免 Kafka Streams 在测试中启动

在我的SpringBoot应用程序的测试过程中,我遇到了一个非常烦人的问题。我有一个使用KafkaStreams并在专用配置文件中声明它们的应用程序。@EnableKafka@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamConfiguration{@Bean(name=KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicStreamsConfigkStreamsConfigs(){//Omissis}@BeanpublicKStre

java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

我如何在KafkaStreamsAPI中使用具有多个约束的.groupby。与下面的Java8StreamsAPI示例相同publicvoidtwoLevelGrouping(Listpersons){finalMap>>personsByCountryAndCity=persons.stream().collect(groupingBy(Person::getCountry,groupingBy(Person::getCity)));System.out.println("PersonslivinginLondon:"+personsByCountryAndCity.get("UK"

Java Kafka 消费者组未能消费一些消息

注意到一个问题,其中Kafka消费者组(用java实现)始终错过来自代理的一些消息。作为调试的第一线,通过kafka控制台消费者,我可以看到代理中可用的那些消息。Kafka代理版本:0.10.1.0Kafka客户端版本:org.apache.kafkakafka_2.110.9.0.1Kafka消费者配置:Propertiesprops=newProperties();props.put("bootstrap.servers","broker1,broker2,broker3");props.put("group.id","myGroupIdForDemo");props.put("k

java - 如何在消费消息时访问 Kafka header ?

下面是我的配置inputFromKafka经过下面的转换publicMessagetransform(finalMessagemessage){System.out.println("KAFKAMessageHeaders"+message.getHeaders());finalMap>>origData=(Map>>)message.getPayload();//somecodetofigure-outthenonPartitionedDatareturnMessageBuilder.withPayload(nonPartitionedData).build();}不管怎样,上面的打

java - 为什么消费者在使用 Java 客户端 API 在 DC/OS 上使用来自 Kafka 的消息时挂起?

我在AWS上的DC/OS(Mesos)集群上安装了Kafka。启用三个代理并创建一个名为“topic1”的主题。dcoskafkatopiccreatetopic1--partitions3--replication3然后我编写了一个Producer类来发送消息和一个Consumer类来接收它们。publicclassProducer{publicstaticvoidsendMessage(Stringmsg)throwsInterruptedException,ExecutionException{MapproducerConfig=newHashMap();System.out.p

java - Apache Kafka 根据消息的值对窗口消息进行排序

我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}例如:1494002667893-client.message?chatName=1c&messageBody=hello1494002656558-chat.started?chatName=1c&chatPatricipants=3此外,我们为每条消息添加一些消息key,以将它们发送到相应的分区。我想做的是根据消息的{system-timestamp}部分并在1分钟的窗口内

java - spring boot kafka LocalDateTime

我有一个包含java.time.LocalDateTime的基本POJO:packagefoo.bar.asire.api.model;importjava.time.LocalDateTime;publicclassAddress{privateLongid;privateIntegerhouseNumber;privateStringaddress;privateLocalDateTimecreated;publicAddress(){super();}publicAddress(Longid,IntegerhouseNumber,Stringaddress,LocalDateTi

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou

java - 如何找到哪个消费者分配给kafka中主题的哪个分区?

我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给了消费者组中的哪个消费者。假设有消费者组group-A消费主题topic-A,n个分区,那么在group-A托管在不同的VM中。那么如何找到哪个分区分配给哪个消费者主机呢?在kafka0.9.1中可以吗?提前致谢。 最佳答案 您可以检查$KAFKA_HOME/bin/kafka-consumer-groups.sh的工作原理并将其实现集成到您的kafka管理器工具中,该工具将向您展示详细信息组所有者信息(例如,分区分配、滞后、IP)。小组主题分区CURRENT-OFFS