spark-streaming-kafka
全部标签 我需要将RESTAPI调用的输出推送到KAFKA。Restapi返回json输出,其中包含支持信息以及数据输出到json.RawMessagetypeResponsestruct{RequestIDstring`json:"requestId"`Successbool`json:"success"`NextPageTokenstring`json:"nextPageToken,omitempty"`MoreResultbool`json:"moreResult,omitempty"`Errors[]struct{Codestring`json:"code"`Messagestring`
我正在为我的消费者使用sarama(https://github.com/Shopify/sarama/)和Kafka0.8.0。这是我的代码的样子:consumerLoop:for{select{caseevent:=我正在使用缓冲channel(c.sem)来控制一次可以运行多少个processJobgoroutine。这就是我控制消费者的并发/速度的方式。我在使用这种方法时遇到的问题是,如果我需要更改并发性,我必须关闭使用者并重新启动它(channel缓冲区大小是一个命令行标志)。我记录了已处理的偏移量,我必须查看我的日志以确定处理了哪些偏移量以及我希望消费者从哪里恢复。我想要一
我们使用Go的confluentkafka包测试了具有2和3个消费者的消费者组(知道我们将来可能会有更多消费者)。每个主题有10个分区,消息在所有消费者之间拆分。每个主题有5个分区,但不知何故只有一个消费者获取消息。知道为什么会出现这种行为吗? 最佳答案 您可以尝试使用此命令查看分区分配情况,并将结果添加到此处吗?bin/kafka-consumer-groups.sh--bootstrap-server:9092--describe--group--members--verbose默认情况下,Kafka使用范围分区方案,因此有时会
这是我正在运行的代码片段:err:=godotenv.Load()iferr!=nil{log.Fatal("Errorloading.envfile")}broker:=os.Getenv("BROKER")topic:=os.Getenv("TOPIC")username:=os.Getenv("USERNAME")password:=os.Getenv("PASSWORD")calocation:=os.Getenv("CALOCATION")p,err:=kafka.NewProducer(&kafka.ConfigMap{"metadata.broker.list":brok
我正在从C程序写入SOCK_STREAM正在从go程序监听的Unix域套接字,使用net.Listen("unix",sockname).当我将套接字设置为O_NONBLOCK使用fcntl(),我看到C程序在第一次写入时只写入了8192字节。失败后,我监控并回写剩余数据,但我服务器上读取的数据在这种情况下是无效的。当我不使用O_NONBLOCK时,然后整个8762字节被写在一个单一的写入中,一切都按预期工作。C客户端套接字连接if((fd=socket(AF_UNIX,SOCK_STREAM,0))==-1){return;}intflags=fcntl(fd,F_GETFL,0);
我在Golang代码中使用knifebootstrap命令来引导节点。有时食谱中有等待,等待超过10分钟。在这种情况下,我收到错误“错误:IOStream关闭”。有什么办法可以增加时间限制吗?我也在不断地从cmd的输出和错误流中读取日志。funcexecuteCMDWorkstation(cmd*exec.Cmd,projectId,cr_idstring)bool{stdout,err3:=cmd.StdoutPipe()iferr3!=nil{utils.Info.Println(err3)returnfalse}deferstdout.Close()stderr,err4:=
这是我尝试实现一个简单的微服务,它应该从kafka服务器读取消息并通过HTTP发送它。当我从终端运行它时它工作正常,但是当部署到docker上时它会出现panicpanic:runtimeerror:invalidmemoryaddressornilpointerdereference[signalSIGSEGV:segmentationviolationcode=0x1addr=0x40pc=0x7b6345]goroutine12[running]:main.kafkaRoutine.func1(0xc420174060,0x0,0x0)/go/src/github.com/dea
版本:GoLang1.10.2卡夫卡4.4.1Docker18.03.1我正在尝试使用Shopify的Sarama软件包来测试我的Kafka实例。我使用Dockercompose来站起Kafka/Zookeeper,并且一切都成功运行。当我尝试使用Sarama创建Producer客户端时,会引发错误。当我运行以下packagemainimport("fmt""log""os""os/signal""time""strconv""github.com/Shopify/sarama")funcmain(){//Setupconfigurationconfig:=sarama.NewConf
我是新手,可以在下面的github存储库中看到一个只有一个消费者的示例,但是有什么想法可以在golang中为同一主题创建多个消费者吗?https://github.com/confluentinc/confluent-kafka-go/tree/master/examplesconfluent-kafka中是否有任何消费者工厂(生成N个消费者)可以读取相同的主题(带分区)? 最佳答案 Confluentgithubrepo中有一个示例:https://github.com/confluentinc/confluent-kafka-g
我是Golang和Kafa的新手,所以这似乎是一个愚蠢的问题。在我的Kafka消费者首次连接到Kafka服务器后,为什么在与Kafka服务器建立连接和接收第一条消息之间存在延迟(约20秒)?它在consumer.Messages()之前打印一条消息,并为收到的每条消息打印另一条消息。大约20秒的延迟在第一个fmt.Println和第二个fmt.Println之间。packagemainimport("fmt""github.com/Shopify/sarama"cluster"github.com/bsm/sarama-cluster")funcmain(){//Createtheco