草庐IT

apache-kafka-streams

全部标签

java - RabbitMQ - Apache Camel Reading Messages 如何处理失败的消息

我有以下PHP应用程序。将用户注册发布到消息队列。Java应用程序从该队列中读取并导入它。希望下图能够描述它。我只处理Java方面的事情。json消息已经存在于队列中。路由(Java消费端)。@ComponentpublicclassSignUpRouting{errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());from("rabbitmq://phpSignUp.exchange?username=etc....").ro

java - Apache Storm Bolt 任务在一段时间后未收到消息

我们有一个Storm拓扑,其中配置了一个spout和两个bolts。Spout不断地从数据库中查询数据并将元组发送到第一个bolt进行一些处理。第一个bolt进行一些处理并将元组发送到第二个bolt,第二个bolt调用第三方网络服务并发送数据。所以,一段时间后发生了什么,最后一个bolt没有得到任何元组,如果我们重新启动拓扑它工作正常。这里只有最后一个bolt有问题。其他spout和firstbolt运行良好,我没有使用acking框架。在这种情况下,我只配置了一个worker`。TopologyBuilderbuilder=newTopologyBuilder();builder.s

java - Apache Poi 3.13 找不到打开 XLSX 文件的类

我正在使用apachePOI通过Java读写Excel文件,但我无法在源代码中找到WorkbookFactory或XSSFWorkbook来读取xlsx文件。pom.xml:3.13org.apache.poipoi${poi.version}org.apache.poipoi-ooxml${poi.version}我在apachepoi的变更日志中找不到任何可能导致此行为的信息。编辑:这是我的实现(暂时只是一个简单的方法)publicstaticHSSFSheetgetXLSSheet(StringfileName,intsheetIndex)throwsIOException{In

java - kafka log-compaction消费数据

我正在阅读最新版本的kafka中的日志压缩,我很好奇这对消费者有何影响。消费者是否像以前一样工作,或者是否有一个新的流程来获取所有最新值?对于“标准”Kafka主题,我使用消费者组来维护指向最新值的指针。但是,如果Kafka基于键而不是时间来保存值,我想知道消费者群体将如何运作? 最佳答案 它不会影响消费者的工作方式。如果你只对每个键的最新值感兴趣并阅读整个主题,你可能仍然会看到一个键的“重复项”(如果不是所有重复项都被消除,或者在上次压缩运行后写入新消息)因此你只关心关于每个键的最新值。关于消费者组:当主题被压缩时,有效偏移量范围

java - Apache Spark 如何将函数发送到引擎盖下的其他机器

我开始使用Pyspark进行一些数据处理。我可以做一些像这样的事情对我来说很有趣rdd.map(lambdax:(x['somekey'],1)).reduceByKey(lambdax,y:x+y).count()它会将这些函数中的逻辑发送到可能多台机器上以并行执行。现在,如果我有Java背景,如果我想将包含某些方法的对象发送到另一台机器,那台机器需要知道通过网络流式传输的对象的类定义。最近java有了函数式接口(interface)的想法,它将在编译时为我创建该接口(interface)的实现(即MyInterfaceimpl=()->System.out.println("Stu

java - 为什么 Kafka Direct Stream 会为每条消息创建一个新的解码器?

我有一个用Java编写并使用Spark2.1的Spark流应用程序。我正在使用KafkaUtils.createDirectStream来读取来自Kafka的消息。我正在为kafka消息使用kryo编码器/解码器。我在Kafkaproperties->key.deserializer,value.deserializer,key.serializer,value.deserializer中指定了这个当Spark在微批中拉取消息时,使用kryo解码器成功解码消息。但是我注意到Spark执行程序创建了一个新的kryo解码器实例,用于解码从kafka读取的每条消息。我通过将日志放入解码器构造

java - 使用 Spring Boot 1.5 避免 Kafka Streams 在测试中启动

在我的SpringBoot应用程序的测试过程中,我遇到了一个非常烦人的问题。我有一个使用KafkaStreams并在专用配置文件中声明它们的应用程序。@EnableKafka@EnableKafkaStreams@ConfigurationpublicclassKafkaStreamConfiguration{@Bean(name=KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)publicStreamsConfigkStreamsConfigs(){//Omissis}@BeanpublicKStre

java - Apache Kafka 1.0.0 Streams API Multiple Multilevel groupby

我如何在KafkaStreamsAPI中使用具有多个约束的.groupby。与下面的Java8StreamsAPI示例相同publicvoidtwoLevelGrouping(Listpersons){finalMap>>personsByCountryAndCity=persons.stream().collect(groupingBy(Person::getCountry,groupingBy(Person::getCity)));System.out.println("PersonslivinginLondon:"+personsByCountryAndCity.get("UK"

java - SlidingWindows 用于 Apache Beam 上的慢速数据(大间隔)

我正在使用ChicagoTrafficTracker数据集,每15分钟发布一次新数据。当有新数据可用时,它表示记录与“实时”相差10-15分钟(example,查找_last_updt)。例如,在00:20,我得到时间戳为00:10的数据;在00:35,我从00:20开始;在00:50,我从00:40开始。因此,我可以“固定”获取新数据的时间间隔(每15分钟一次),尽管时间戳的时间间隔略有变化。我正在尝试在Dataflow(ApacheBeam)上使用这些数据,为此我正在玩滑动窗口。我的想法是收集和处理4个连续的数据点(4x15分钟=60分钟),理想情况下,一旦有新数据点可用,就更新我

java - 如何使用 Apache POI 在 Word .docx 文件中正确生成 RSID 属性?

我一直在使用ApachePOI来操作MicrosoftWord.docx文件——即打开最初在MicrosoftWord中创建的文档,对其进行修改,然后将其保存到新文档中。我注意到ApachePOI创建的新段落缺少修订保存ID,通常称为RSID或rsidR。Word使用它来识别在一个session中对文档所做的更改,比如在保存之间。它是可选的——用户可以根据需要在MicrosoftWord中将其关闭——但实际上几乎每个人都打开它,所以几乎每个文档都充满了RSID。阅读thisexcellentexplanationofRSIDs有关更多信息。在MicrosoftWord文档中,word/