草庐IT

KAFKA_HOME

全部标签

go - 使用 kafka go 读取 __consumer_offsets

我想使用这个库阅读主题__consumer_offsets:https://github.com/segmentio/kafka-go我的问题是,除非我指定一个分区,否则似乎什么都不会发生。默认情况下,这个主题有100个分区,向kafka查询分区列表然后循环读取它们似乎是不合理的,我希望库中有一个预先存在的方法来读取所有分区的消息在主题中。在我用kafkacat验证__consumer_offsets主题的分区15中有消息后,目前有以下工作:r:=kafka.NewReader(kafka.ReaderConfig{Brokers:[]string{"kafka:9092"},Topi

go - 使用 Jaeger 在分布式应用程序中跟踪 Kafka 总线

我分发了包含多个Go服务的应用程序。其中一些使用Kafka作为数据总线。我能够使用Jaeger的opentracing追踪服务之间的调用。我在图表上绘制Kafka跨度时遇到问题,它们显示为间隙。这是我能做的。初始跨度由gRPC中间件创建。生产方:...kafkaMsg:=kafka.Message{Key:[]byte(key),Value:msgBytes}headers:=make(map[string]string)ifspan:=opentracing.SpanFromContext(ctx);span!=nil{opentracing.GlobalTracer().Injec

go - 重消费Kafka消息的可能原因

昨天从日志中发现,kafkagroupcoordinator发起grouprebalance后,kafka重新消费了一些消息。这些消息已在两天前使用(从日志中确认)。日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次reblancing会导致重新消费消息呢?有什么问题?我使用的是golangkafka客户端。这是代码config:=sarama.NewConfig()config.Version=versionconfig.Consumer.Offsets.Initial=sarama.OffsetOldest而且我们在声明消息之前处理消息,因此我们似乎正在为kaf

go - 如何修复 `kafka: client has run out of available brokers to talk to (Is your cluster reachable?)` 错误

我正在开发一个应用程序,该应用程序从sqs队列中读取一条消息,对该数据执行一些操作,然后获取结果并将其发布到kafka主题。为了在本地进行测试,我想在我的docker构建中设置一个kafka图像。我目前能够使用docker-compose在本地启动aws-cli、localstack和我的应用程序的容器。另外,我也可以毫无问题地启动kafka和zookeper。我无法让我的应用程序与kafka通信。我试过使用两个单独的撰写文件,也尝试过网络。最后,我引用了:https://rmoff.net/2018/08/02/kafka-listeners-explained/。这是我的docke

go - 使用 kafka-go 计划在 Kafka 中创建消费者

我是kafka的新手,目前正在研究它。我在golang中使用kafka-go来创建生产者和消费者。目前我能够创建一个生产者,但我希望一旦创建了一个主题的生产者而不是每次都创建消费者。意味着对于每个主题,只创建一次消费者。此外,当需要为主题创建更多消费者以平衡负载时,它会被创建。有没有办法通过goroutines或Faktory来安排它? 最佳答案 你不应该有耦合的生产者/消费者,Kafka让你有完全解耦的生产者/消费者。即使主题不存在,您也可以运行您的消费者(Kafka将创建它,您只会收到一个领导者不可用警告),并在您需要时运行您的

Golang Kafka 不消耗所有消息 offsetnewest

第一批:-我正在尝试从100个平面文件中提取数据并将其加载到一个数组中,然后将它们作为字节数组一个一个地插入到kafka生产者中。第二批:-我从kafka消费者消费,然后将它们插入NoSQL数据库。我在Kafka的shopifysaramagolang包的配置文件中使用了Offsetnewset。我可以接收消息并将消息插入到kafka,但在消费时我只收到第一条消息。因为我在sarama配置中提供了最新的Offset。我怎样才能得到这里的所有数据。 最佳答案 如果没有任何代码或关于如何配置kafka的更深入的解释(即:主题、分区等),

go - Kafka 0.11/Golang Sarama 版本支持

我花了一些时间发现连接到Kafka0.11集群的Go应用程序使用的是旧的0.8.2版本的库,它在响应中缺少时间戳值。然后我发现不支持Kafka0.11.xAPI/版本(但他们正在努力)。我现在有两个解决方案。首先是在我的应用程序中明确设置所需的版本。其次是“调整”Sarama代码以使用版本0.10.x作为最低版本,使我能够使用所有0.10.xAPI/功能。我还在想为什么版本不是从我正在连接的Kafka代理中获取的?我无法从代码中理解它应该如何工作...我清楚地看到在sarama.Config.Version中设置或定义的版本,但我无法在连接后找到任何更新此值的内容给经纪人?我知道Pyt

go - Confluent Kafka Golang 客户端生产者 "Broker: Not enough in-sync replicas"

我正在尝试测试生产者使用Golang客户端向kafka集群上的主题写入消息。这可以很好地写入本地集群上的主题,我只是复制并粘贴了他们的示例代码githubrepo.packagemainimport("fmt""gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")funcmain(){p,err:=kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers":"localhost"})iferr!=nil{panic(err)}deferp.Close()//Deliveryreporth

go - 如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费

我有三个问题:“最早的偏移量”是什么意思?最早的偏移量并不意味着偏移量为0?//OffsetOldeststandsfortheoldestoffsetavailableonthebrokerfora//partition.OffsetOldestint64=-2假设一个。三个代理在同一台机器上运行B、消费组只有一个消费线程C.消费者配置OffsetOldest标志。D.已经产生了100条消息,目前消费者线程已经消耗了90条消息。那么如果消费者线程重启了,那么这个消费者会从哪个offset开始消费呢?是91还是0?在我们下面的代码中,似乎每次启动消费者时都会重新消费消息。但实际上它确实

docker - 如何使用主机上的golang访问安装在docker中的kafka

我需要使用golang来访问kafka,所以我在docker中安装了kafka和zookepper。1.这里是kafka安装脚本:#pullimagesdockerpullwurstmeister/zookeeperdockerpullwurstmeister/kafka#runkafka&zookepperdockerrun-d--namezookeeper-p2181-twurstmeister/zookeeperdockerrun--namekafka-eHOST_IP=localhost-eKAFKA_ADVERTISED_PORT=9092-eKAFKA_BROKER_ID=