草庐IT

Kafka-Source

全部标签

docker - 无法使用Sarama Golang软件包创建Kafka生产者客户端-“客户端/元数据在获取元数据时从代理处出错:EOF”

版本:GoLang1.10.2卡夫卡4.4.1Docker18.03.1我正在尝试使用Shopify的Sarama软件包来测试我的Kafka实例。我使用Dockercompose来站起Kafka/Zookeeper,并且一切都成功运行。当我尝试使用Sarama创建Producer客户端时,会引发错误。当我运行以下packagemainimport("fmt""log""os""os/signal""time""strconv""github.com/Shopify/sarama")funcmain(){//Setupconfigurationconfig:=sarama.NewConf

go - 在kafka中为同一主题创建多个消费者

我是新手,可以在下面的github存储库中看到一个只有一个消费者的示例,但是有什么想法可以在golang中为同一主题创建多个消费者吗?https://github.com/confluentinc/confluent-kafka-go/tree/master/examplesconfluent-kafka中是否有任何消费者工厂(生成N个消费者)可以读取相同的主题(带分区)? 最佳答案 Confluentgithubrepo中有一个示例:https://github.com/confluentinc/confluent-kafka-g

go - Golang Consumer连接Kafka后延迟接收Kafka消息

我是Golang和Kafa的新手,所以这似乎是一个愚蠢的问题。在我的Kafka消费者首次连接到Kafka服务器后,为什么在与Kafka服务器建立连接和接收第一条消息之间存在延迟(约20秒)?它在consumer.Messages()之前打印一条消息,并为收到的每条消息打印另一条消息。大约20秒的延迟在第一个fmt.Println和第二个fmt.Println之间。packagemainimport("fmt""github.com/Shopify/sarama"cluster"github.com/bsm/sarama-cluster")funcmain(){//Createtheco

去 source.cloud.google.com 获取

我有一个托管在source.cloud.google.com上的项目。我希望使用goget和gomodules来管理它。当我执行goget时,我得到以下信息:$gogetsource.cloud.google.com//gogetsource.cloud.google.com//:unrecognizedimportpath"source.cloud.google.com//"(parsehttps://source.cloud.google.com//?go-get=1:nogo-importmetatags())有人让这个工作吗? 最佳答案

Golang segmentio/kafka-go 消费者不工作

我正在使用segmentio/kafka-go连接到卡夫卡。//toproducemessagestopic:="my-topic"partition:=0conn,_:=kafka.DialLeader(context.Background(),"tcp","localhost:9092",topic,partition)conn.SetWriteDeadline(time.Now().Add(10*time.Second))conn.WriteMessages(kafka.Message{Value:[]byte("one!")},kafka.Message{Value:[]byt

go - 使用 kafka go 读取 __consumer_offsets

我想使用这个库阅读主题__consumer_offsets:https://github.com/segmentio/kafka-go我的问题是,除非我指定一个分区,否则似乎什么都不会发生。默认情况下,这个主题有100个分区,向kafka查询分区列表然后循环读取它们似乎是不合理的,我希望库中有一个预先存在的方法来读取所有分区的消息在主题中。在我用kafkacat验证__consumer_offsets主题的分区15中有消息后,目前有以下工作:r:=kafka.NewReader(kafka.ReaderConfig{Brokers:[]string{"kafka:9092"},Topi

go - 使用 Jaeger 在分布式应用程序中跟踪 Kafka 总线

我分发了包含多个Go服务的应用程序。其中一些使用Kafka作为数据总线。我能够使用Jaeger的opentracing追踪服务之间的调用。我在图表上绘制Kafka跨度时遇到问题,它们显示为间隙。这是我能做的。初始跨度由gRPC中间件创建。生产方:...kafkaMsg:=kafka.Message{Key:[]byte(key),Value:msgBytes}headers:=make(map[string]string)ifspan:=opentracing.SpanFromContext(ctx);span!=nil{opentracing.GlobalTracer().Injec

go - 重消费Kafka消息的可能原因

昨天从日志中发现,kafkagroupcoordinator发起grouprebalance后,kafka重新消费了一些消息。这些消息已在两天前使用(从日志中确认)。日志中报告了另外两个重新平衡,但它们不再重新使用消息。那么为什么第一次reblancing会导致重新消费消息呢?有什么问题?我使用的是golangkafka客户端。这是代码config:=sarama.NewConfig()config.Version=versionconfig.Consumer.Offsets.Initial=sarama.OffsetOldest而且我们在声明消息之前处理消息,因此我们似乎正在为kaf

go - 如何修复 `kafka: client has run out of available brokers to talk to (Is your cluster reachable?)` 错误

我正在开发一个应用程序,该应用程序从sqs队列中读取一条消息,对该数据执行一些操作,然后获取结果并将其发布到kafka主题。为了在本地进行测试,我想在我的docker构建中设置一个kafka图像。我目前能够使用docker-compose在本地启动aws-cli、localstack和我的应用程序的容器。另外,我也可以毫无问题地启动kafka和zookeper。我无法让我的应用程序与kafka通信。我试过使用两个单独的撰写文件,也尝试过网络。最后,我引用了:https://rmoff.net/2018/08/02/kafka-listeners-explained/。这是我的docke

go - 使用 kafka-go 计划在 Kafka 中创建消费者

我是kafka的新手,目前正在研究它。我在golang中使用kafka-go来创建生产者和消费者。目前我能够创建一个生产者,但我希望一旦创建了一个主题的生产者而不是每次都创建消费者。意味着对于每个主题,只创建一次消费者。此外,当需要为主题创建更多消费者以平衡负载时,它会被创建。有没有办法通过goroutines或Faktory来安排它? 最佳答案 你不应该有耦合的生产者/消费者,Kafka让你有完全解耦的生产者/消费者。即使主题不存在,您也可以运行您的消费者(Kafka将创建它,您只会收到一个领导者不可用警告),并在您需要时运行您的