草庐IT

kafka-stream

全部标签

java - 如何在 Java 8 Stream.flatMap(..) 中捕获异常

给定一个Stream和一个返回Stream作为数据源的不同参数的方法,我正在寻找一种通过flatMap合并流的方法(..)并在执行期间捕获某些Exceptions。让我们看下面的代码片段:publicclassFlatMap{publicstaticvoidmain(finalString[]args){longcount;//thismightthrowanexceptioncount=Stream.of(0.2,0.5,0.99).flatMap(chance->getGenerator(chance,20)).count();//tryingtocatchtheexception

java - 使用 Java8 stream api 将函数列表应用于值

我想要一个单条日志消息pojoLoggedExchange并对其应用一系列转换。转换是列表中的一元运算符:Listtransforms=newArrayList();哪里ConditionalTransform工具UnaryOperator我目前的解决方案是像这样使用reduce:publicLoggedExchangetransform(LoggedExchangeoriginal){returntransforms.stream().reduce(original,(o,t)->t.apply(o),(m1,m2)->m2);}并行运行它没有意义,因为无法合并两条消息((m1,m2

java - 如何在消费消息时访问 Kafka header ?

下面是我的配置inputFromKafka经过下面的转换publicMessagetransform(finalMessagemessage){System.out.println("KAFKAMessageHeaders"+message.getHeaders());finalMap>>origData=(Map>>)message.getPayload();//somecodetofigure-outthenonPartitionedDatareturnMessageBuilder.withPayload(nonPartitionedData).build();}不管怎样,上面的打

java - 如何使用 Java 8 Stream 扩展和重组列表列表?

我有一个A类列表,其中包括一个列表本身。publicclassA{publicdoubleval;publicStringid;publicListnames=newArrayList();publicA(doublev,StringID,Stringname){val=v;id=ID;names.add(name);}staticpublicListcreateAnExample(){Listitems=newArrayList();items.add(newA(8.0,"x1","y11"));items.add(newA(12.0,"x2","y21"));items.add(n

java - 如何将这个经典的 Java 代码重写为 Java Stream API 代码?

有一段旧的Java代码(没有lambda表达式):publicListgetAttackedCheckersForPoint(CheckerPositionfrom,booleanisSecondPlayerOwner,booleanisQueen,VectorDirectionignoredDirection){ListallDirections=VectorDirection.generateAllDirections();Listresult=newArrayList();for(VectorDirectiondirection:allDirections){if(!direct

java - 为什么消费者在使用 Java 客户端 API 在 DC/OS 上使用来自 Kafka 的消息时挂起?

我在AWS上的DC/OS(Mesos)集群上安装了Kafka。启用三个代理并创建一个名为“topic1”的主题。dcoskafkatopiccreatetopic1--partitions3--replication3然后我编写了一个Producer类来发送消息和一个Consumer类来接收它们。publicclassProducer{publicstaticvoidsendMessage(Stringmsg)throwsInterruptedException,ExecutionException{MapproducerConfig=newHashMap();System.out.p

java - Apache Kafka 根据消息的值对窗口消息进行排序

我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。我有Kafka发布者发送以下格式的字符串消息:{system_timestamp}-{event_name}?{parameters}例如:1494002667893-client.message?chatName=1c&messageBody=hello1494002656558-chat.started?chatName=1c&chatPatricipants=3此外,我们为每条消息添加一些消息key,以将它们发送到相应的分区。我想做的是根据消息的{system-timestamp}部分并在1分钟的窗口内

java - spring boot kafka LocalDateTime

我有一个包含java.time.LocalDateTime的基本POJO:packagefoo.bar.asire.api.model;importjava.time.LocalDateTime;publicclassAddress{privateLongid;privateIntegerhouseNumber;privateStringaddress;privateLocalDateTimecreated;publicAddress(){super();}publicAddress(Longid,IntegerhouseNumber,Stringaddress,LocalDateTi

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou

java - 仅使用两个键和奇数或偶数列表索引作为值将 List 转换为 Map - Java 8 Stream

我想将列表转换为map,只使用两个字符串值作为键值。然后作为值只是包含来自输​​入列表的奇数或偶数索引位置的元素的字符串列表。这是旧时尚代码:Map>map=newHashMap();Listlist=Arrays.asList("one","two","three","four");map.put("evenIndex",newArrayList());map.put("oddIndex",newArrayList());for(inti=0;i如何使用流将此代码转换为Java8以获得此结果?{evenIndex=[one,three],oddIndex=[two,four]}我目前