草庐IT

go - 我的 Golang 应用程序中是否需要一个或多个 sarama.SyncProducer?

我是Golang的新手,我需要编写将事件发布到kafka的应用程序,我找不到以下问题的答案:我需要多少个sarama.SyncProducer?可以在所有应用中使用一个吗?我应该有某种生产者池吗? 最佳答案 除非您以比发布者发布它的速度高得多的速度发布数据(?),因为sarama发布者是asynchronousandconcurrent,我会说你不需要一个以上的出版商。所以直接回答你的问题:我会选择一个,但不知道您的要求。是的。我认为人们更有可能需要一群消费者,当然,消息发布率很高。 关

go - 我的 Golang 应用程序中是否需要一个或多个 sarama.SyncProducer?

我是Golang的新手,我需要编写将事件发布到kafka的应用程序,我找不到以下问题的答案:我需要多少个sarama.SyncProducer?可以在所有应用中使用一个吗?我应该有某种生产者池吗? 最佳答案 除非您以比发布者发布它的速度高得多的速度发布数据(?),因为sarama发布者是asynchronousandconcurrent,我会说你不需要一个以上的出版商。所以直接回答你的问题:我会选择一个,但不知道您的要求。是的。我认为人们更有可能需要一群消费者,当然,消息发布率很高。 关

string - 如何使用 map[string]*string

我正在尝试使用sarama(管理员模式)创建主题。没有ConfigEntries工作正常。但我需要定义一些配置。我设置了主题配置(这里发生了错误):tConfigs:=map[string]*string{"cleanup.policy":"delete","delete.retention.ms":"36000000",}但是我得到一个错误:./main.go:99:28:cannotuse"delete"(typestring)astype*stringinmapvalue./main.go:100:28:cannotuse"36000000"(typestring)astype*

string - 如何使用 map[string]*string

我正在尝试使用sarama(管理员模式)创建主题。没有ConfigEntries工作正常。但我需要定义一些配置。我设置了主题配置(这里发生了错误):tConfigs:=map[string]*string{"cleanup.policy":"delete","delete.retention.ms":"36000000",}但是我得到一个错误:./main.go:99:28:cannotuse"delete"(typestring)astype*stringinmapvalue./main.go:100:28:cannotuse"36000000"(typestring)astype*

go - 在 sarama-cluster 中模拟 NewConsumer

有没有办法在不设置实际代理的情况下测试/模拟sarama-cluster的NewConsumer函数?我在这里缺少什么?我要测试的代码:importcluster"github.com/bsm/sarama-cluster"funcinitSaramaConsumer()(*cluster.Consumer,error){brokers:=[]string{"some_url:port"}groups:="some_group"topics:=[]string{"some_topic"}config:=cluster.NewConfig()saramaConsumer,err:=clu

go - 在 sarama-cluster 中模拟 NewConsumer

有没有办法在不设置实际代理的情况下测试/模拟sarama-cluster的NewConsumer函数?我在这里缺少什么?我要测试的代码:importcluster"github.com/bsm/sarama-cluster"funcinitSaramaConsumer()(*cluster.Consumer,error){brokers:=[]string{"some_url:port"}groups:="some_group"topics:=[]string{"some_topic"}config:=cluster.NewConfig()saramaConsumer,err:=clu

go - 为什么 Shopify Sarama 消费者需要分区来消费消息

我很抱歉发布与Kafka图书馆相关的问题,因为没有多少人对图书馆的特定问题感兴趣。但是这个库是golang-Kafka实现中最常用的库之一。我想使用Sarama库创建一个简单的消费者来监听一个主题。据我所知,在高级KafkaAPI中,如果未指定特定分区,默认情况下消费者会监听所有主题分区。但是,在此库中,Consumer接口(interface)只有ConsumePartition函数,其中分区是必需的参数。函数的签名是:ConsumePartition(topicstring,partitionint32,offsetint64)(PartitionConsumer,error)这让

go - 如何通过 shopify sarama 抵消来处理消费者恢复

我读到kafka提供了一个消费者客户端库,它允许通过在zookeeper中保存最后一次读取的偏移量来恢复(不能100%确定它的存储位置)。是否可以对Sarama消费者做同样的事情?假设我正在读取直到偏移量550,我的消费者崩溃了5分钟,我们现在处于偏移量700,但我想从偏移量550恢复消费。这有可能不用我自己保存状态吗?我会假设它确实如此,但我不明白如何。我找到了sarama.OffsetNewest/Oldest,但这不是我要找的... 最佳答案 Kafka消费者过去将偏移量存储在Zookeeper中,但现在他们将它们直接存储在K

go - 如何使用 Sarama 在多个 goroutine 中消费 Kafka 主题?

我使用https://github.com/Shopify/sarama用于与Kafka交互。我有一个主题,例如,100个分区。我有一个部署在1台主机上的应用程序。所以,我想在多个goroutine中使用这个主题。我看到这个例子-https://github.com/Shopify/sarama/blob/master/examples/consumergroup/main.go,在其中我们可以看到,如何在特定的消费者组中创建消费者。所以,我的问题是,我应该创建多个这样的消费者,还是在Sarama中有一些设置,我可以在其中设置所需数量的消费者goroutine。附言我看到这个问题-ht

go - Sarama 无法与 Kafka 服务器通信

所以我正在尝试配置Sarama(kafka的原生go客户端)生产者客户端。我相应地配置了我的TLS,确保使用正确的密码生成客户端证书。我用来初始化客户端的Go代码如下所示:import("crypto/tls""crypto/x509""encoding/pem""io/ioutil""net""path/filepath""github.com/Shopify/sarama"log"github.com/sirupsen/logrus")const(certFile="client_ingestion_client.pem"keyFile="client_ingestion_clie