草庐IT

Kafka-eagle

全部标签

java - kafka如何平衡分区负载?

我遇到了一个关于kafka负载均衡的问题。因此,我创建了一个包含10个分区的主题并创建了2个消费者。10个分区被划分并分配给这些消费者(5个分区分配给第一个,5个分区分配给第二个)并且工作正常。有时第一个消费者工作,有时第二个。但有时我们可能会遇到这样的情况,例如第二个消费者收到一条消息,并且需要时间(例如10分钟)来处理这条消息。那么,我的问题是kafka将如何决定将消息存储到哪个分区?在这种情况下,我认为循环法不是一个好主意,因为由第二个消费者处理的分区中的消息将不会被处理,直到第二个消费者完成长时间的工作。已更新!根据@MilanBaran的回答,生产者端的负载是平衡的。但在这种

java - Kafka 消费者异常和抵消提交

我一直在尝试为SpringKafka做一些POC工作。具体来说,我想尝试在Kafka中消费消息时处理错误的最佳实践。我想知道是否有人能够提供帮助:分享有关Kafka消费者应该做什么的最佳做法当出现故障时帮助我了解AckModeRecord的工作原理,以及如何在监听器方法中抛出异常时防止提交到Kafka偏移队列。2的代码示例如下:鉴于AckMode设置为RECORD,根据documentation:committheoffsetwhenthelistenerreturnsafterprocessingtherecord.如果监听器方法抛出异常,我会认为偏移量不会增加。但是,当我使用下面的

2步窗的聚合,带有Kafka流DSL

假设我有一个由每秒1个数据点组成的流“流-1”,我想计算一个派生的流“stream-5”,该流使用5秒的跳窗口和另一个流式的“stream-10”包含总和它基于“stream-5”,其中包含10秒的跳窗口。需要分别为每个密钥完成聚合,我希望能够在不同的过程中运行每个步骤。如果Stream-5和Stream-10包含相同键/时间戳的更新,则本身并不是问题(因此我不一定需要如何发送最终的kafka-streams聚合结果,该结果是窗口窗口的ktable?)只要最后值正确。是否有一种(简单)使用高级Kafka流DSL解决此问题的方法?到目前为止,由于汇总,我还没有看到一种优雅的方式来处理Stream

51.Go操作kafka示例(kafka-go库)

文章目录一、简介二、生产者三、消费者代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go一、简介之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库),但是这个库比较老了,当前比较流行的库是github.com/segmentio/kafka-go,所以本次我们就使用一下它。我们在GitHub直接输入kafka并带上language标签为Go时,可以可以看到当前getgithub.com/segmentio/kafka-go库是最流行的。首先

java - Kafka Streams - 处理超时

我正在尝试使用.process()用TimeWindows.of("name",30000)批处理一些KTable值并发送它们。似乎30秒超过了消费者超时间隔,在此之后Kafka认为该消费者已失效并释放分区。我已经尝试提高轮询和提交间隔的频率来避免这种情况:config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"5000");config.put(StreamsConfig.POLL_MS_CONFIG,"5000");不幸的是,这些错误仍在发生:(很多)ERRORo.a.k.s.p.internals.RecordCollector-E

Kafka SASL_SSL双重认证

文章目录1.背景2.环境3.操作步骤3.1生成SSL证书3.2配置zookeeper认证3.3配置kafka安全认证3.4使用kafka客户端进行验证3.5使用Java端代码进行认证1.背景kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。SASL:是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。SSL:是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。在Kafka中启用SASL_SSL安全协议时,SASL用于客户

java - 如何运行以下命令来测试kafka服务器是否安装正确?

我已经安装了Kafka和zookeeper。动物园管理员工作正常。但是,当我尝试运行Kafka服务器时,出现以下错误。请帮我解决这个问题。先感谢您!!!从C:\kafka-0.9.0.1运行的命令:.\bin\windows\kafka-server-start.bat.\config\server.properties错误信息:类路径为空。请先构建项目,例如通过运行“gradlewjarAll” 最佳答案 当你安装Kafka时,你是从源码下载还是二进制下载?下载源代码分发时会出现此问题。要解决此问题,请通过二进制下载链接下载:Ka

java - 如何选择一个Kafka transaction.id

我想知道我能否在理解Kafka中的交易方面获得帮助,尤其是如何使用transaction.id。这里是上下文:我的Kafka应用程序遵循以下模式:使用来自输入主题的消息,进行处理,然后发布到输出主题。我不使用KafkaStreamsAPI。我在一个消费者组中有多个消费者,每个消费者都在自己的轮询线程中。有一个带有工作线程的线程池,该线程用于执行消息处理并将其发布到输出主题。目前,每个线程都有自己的生产者实例。我正在使用已发布的事务API,以确保消耗偏移量的更新和对输出主题的发布原子地进行到目前为止,我的假设包括:如果我的进程在中间事务中崩溃,那么该事务中的任何内容都不会发布,也不会消耗

Kafka 之 AdminClient 配置

目录一.前言二.AdminClient原理和功能2.1.AdminClient原理2.2. AdminClient功能三. AdminClient配置四. Kafka>=2.0.0 版本五.Kafka>= 2.1.0版本六. Kafka>=2.7 版本一.前言  一般情况下,我们都习惯使用kafka-topics.sh脚本来管理主题,但有些时候我们希望将主题管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用API的方式去实现。  Kafka社区于0.11版本正式推出了Java客户端版的AdminClient,并不断地在后续的版本中对它进行完善

java - Kafka - TimestampExtractor 的问题

我使用org.apache.kafka:kafka-streams:0.10.0.1我正在尝试使用一个基于时间序列的流,它似乎不会触发KStream.Process()来触发(“标点符号”)。(引用here)在KafkaStreams配置中,我传递了这个参数(以及其他参数):config.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,EventTimeExtractor.class.getName());这里,EventTimeExtractor是一个自定义时间戳提取器(实现了org.apache.kafka.streams.pr