我已经在单个节点上设置了kafka,并启动了zookeeper和kafka服务器。我在控制台上针对内部生产者和消费者对其进行了测试,并且运行良好。但是当我在控制台上运行内部kafka消费者时,我的自定义生产者它不起作用。下面是我的Producer类Propertiesprops=newProperties();props.put("metadata.broker.list","xx.xx.xx.xx:9092");props.put("serializer.class","kafka.serializer.StringEncoder");props.put("partitioner.c
我正在阅读最新版本的kafka中的日志压缩,我很好奇这对消费者有何影响。消费者是否像以前一样工作,或者是否有一个新的流程来获取所有最新值?对于“标准”Kafka主题,我使用消费者组来维护指向最新值的指针。但是,如果Kafka基于键而不是时间来保存值,我想知道消费者群体将如何运作? 最佳答案 它不会影响消费者的工作方式。如果你只对每个键的最新值感兴趣并阅读整个主题,你可能仍然会看到一个键的“重复项”(如果不是所有重复项都被消除,或者在上次压缩运行后写入新消息)因此你只关心关于每个键的最新值。关于消费者组:当主题被压缩时,有效偏移量范围
我有一个用Java编写并使用Spark2.1的Spark流应用程序。我正在使用KafkaUtils.createDirectStream来读取来自Kafka的消息。我正在为kafka消息使用kryo编码器/解码器。我在Kafkaproperties->key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这个当Spark在微批中拉取消息时,使用kryo解码器成功解码消息。但是我注意到Spark执行程序创建了一个新的kryo解码器实例,用于解码从kafka读取的每条消息。我通过将日志放入解码器构造
在我的SpringBoot应用程序的测试过程中,我遇到了一个非常烦人的问题。我有一个使用KafkaStreams并在专用配置文件中声明它们的应用程序。@EnableKafka@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamConfiguration{@Bean(name=KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicStreamsConfigkStreamsConfigs(){//Omissis}@BeanpublicKStre
我如何在KafkaStreamsAPI中使用具有多个约束的.groupby。与下面的Java8StreamsAPI示例相同publicvoidtwoLevelGrouping(Listpersons){finalMap>>personsByCountryAndCity=persons.stream().collect(groupingBy(Person::getCountry,groupingBy(Person::getCity)));System.out.println("PersonslivinginLondon:"+personsByCountryAndCity.get("UK"
我想我弄乱了一些东西。昨天一切正常。今天有很多问题..1.我运行了这个命令echo$JAVA_HOME/usr/lib/jvm/java-7-openjdk-amd64/然后mvn--versionError:Couldnotfindorloadmainclassorg.codehaus.plexus.classworlds.launcher.Launcher昨天它运行良好(返回maven版本,我认为是3.0.4)。现在这已经开始产生问题了。怎么办?同样在运行之后$java--版本无法识别的选项:--version无法创建Java虚拟机。这是我的.bashrc文件的内容,我的/etc/
注意到一个问题,其中Kafka消费者组(用java实现)始终错过来自代理的一些消息。作为调试的第一线,通过kafka控制台消费者,我可以看到代理中可用的那些消息。Kafka代理版本:0.10.1.0Kafka客户端版本:org.apache.kafkakafka_2.110.9.0.1Kafka消费者配置:Propertiesprops=newProperties();props.put("bootstrap.servers","broker1,broker2,broker3");props.put("group.id","myGroupIdForDemo");props.put("k
下面是我的配置inputFromKafka经过下面的转换publicMessagetransform(finalMessagemessage){System.out.println("KAFKAMessageHeaders"+message.getHeaders());finalMap>>origData=(Map>>)message.getPayload();//somecodetofigure-outthenonPartitionedDatareturnMessageBuilder.withPayload(nonPartitionedData).build();}不管怎样,上面的打
我在AWS上的DC/OS(Mesos)集群上安装了Kafka。启用三个代理并创建一个名为“topic1”的主题。dcoskafkatopiccreatetopic1--partitions3--replication3然后我编写了一个Producer类来发送消息和一个Consumer类来接收它们。publicclassProducer{publicstaticvoidsendMessage(Stringmsg)throwsInterruptedException,ExecutionException{MapproducerConfig=newHashMap();System.out.p
我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}例如:1494002667893-client.message?chatName=1c&messageBody=hello1494002656558-chat.started?chatName=1c&chatPatricipants=3此外,我们为每条消息添加一些消息key,以将它们发送到相应的分区。我想做的是根据消息的{system-timestamp}部分并在1分钟的窗口内