草庐IT

Kafka-Source

全部标签

java - 使用 Spring Boot 1.5 避免 Kafka Streams 在测试中启动

在我的SpringBoot应用程序的测试过程中,我遇到了一个非常烦人的问题。我有一个使用KafkaStreams并在专用配置文件中声明它们的应用程序。@EnableKafka@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamConfiguration{@Bean(name=KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicStreamsConfigkStreamsConfigs(){//Omissis}@BeanpublicKStre

java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

我如何在KafkaStreamsAPI中使用具有多个约束的.groupby。与下面的Java8StreamsAPI示例相同publicvoidtwoLevelGrouping(Listpersons){finalMap>>personsByCountryAndCity=persons.stream().collect(groupingBy(Person::getCountry,groupingBy(Person::getCity)));System.out.println("PersonslivinginLondon:"+personsByCountryAndCity.get("UK"

"Editing Java Source"中的 Java Eclipse 调试快捷方式

我喜欢在Eclipse中调试我的Java应用程序时使用快捷方式。F5、F6、F7和F8键仅在调试视角下有效。因此,我尝试使用“复制命令”按钮复制所有键(首选项->常规->键),并使它们在“编辑Java源代码”时起作用。我点击了“应用”按钮,但仍然不起作用。如何使键(F5、F6、F7和F8)在Java透视图中起作用?谢谢!最好的问候,孔编辑:我使用EclipseIndigoServiceRelease1 最佳答案 从这个旧bug13513:Togetthe"F"buttonstowork,youcurrentlyhavetoenabl

java - 如何配置 jdee `find-class-source-file` 以使用 Cassandra 源代码树?

我正在尝试为即将开始的Java项目安装Emacs+JDEE,但我无法让JDEE正常工作。我想解决的第一个问题是能够跳转到给定类名的源文件。我正在使用Cassandrasource作为我使用JDEE的Playground。我已经设置了一个新的Ubuntu12.04VM和Emacs23.3.1作为我的测试环境。我正在使用默认Ubuntu存储库中的openjdk-1.6并且JDEE似乎正在使用它。我下载了jdeetarball并将其解压到~/.emacs.d中。Cassandra源代码树被checkout到~/cassandra。我的~/.emacs.d/init.el包含以下内容:(set

Java Kafka 消费者组未能消费一些消息

注意到一个问题,其中Kafka消费者组(用java实现)始终错过来自代理的一些消息。作为调试的第一线,通过kafka控制台消费者,我可以看到代理中可用的那些消息。Kafka代理版本:0.10.1.0Kafka客户端版本:org.apache.kafkakafka_2.110.9.0.1Kafka消费者配置:Propertiesprops=newProperties();props.put("bootstrap.servers","broker1,broker2,broker3");props.put("group.id","myGroupIdForDemo");props.put("k

java - 如何在消费消息时访问 Kafka header ?

下面是我的配置inputFromKafka经过下面的转换publicMessagetransform(finalMessagemessage){System.out.println("KAFKAMessageHeaders"+message.getHeaders());finalMap>>origData=(Map>>)message.getPayload();//somecodetofigure-outthenonPartitionedDatareturnMessageBuilder.withPayload(nonPartitionedData).build();}不管怎样,上面的打

java - 为什么消费者在使用 Java 客户端 API 在 DC/OS 上使用来自 Kafka 的消息时挂起?

我在AWS上的DC/OS(Mesos)集群上安装了Kafka。启用三个代理并创建一个名为“topic1”的主题。dcoskafkatopiccreatetopic1--partitions3--replication3然后我编写了一个Producer类来发送消息和一个Consumer类来接收它们。publicclassProducer{publicstaticvoidsendMessage(Stringmsg)throwsInterruptedException,ExecutionException{MapproducerConfig=newHashMap();System.out.p

java - Apache Kafka 根据消息的值对窗口消息进行排序

我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}例如:1494002667893-client.message?chatName=1c&messageBody=hello1494002656558-chat.started?chatName=1c&chatPatricipants=3此外,我们为每条消息添加一些消息key,以将它们发送到相应的分区。我想做的是根据消息的{system-timestamp}部分并在1分钟的窗口内

java - spring boot kafka LocalDateTime

我有一个包含java.time.LocalDateTime的基本POJO:packagefoo.bar.asire.api.model;importjava.time.LocalDateTime;publicclassAddress{privateLongid;privateIntegerhouseNumber;privateStringaddress;privateLocalDateTimecreated;publicAddress(){super();}publicAddress(Longid,IntegerhouseNumber,Stringaddress,LocalDateTi

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou