草庐IT

与卡夫卡消费者一起去 channel

coder 2024-07-09 原文

我刚开始学习 channel 。我正在使用汇合的 kafka 消费者来创建功能性消费者。我想要完成的是将消息发送到缓冲 channel (2,000)...然后使用管道将 channel 中的消息写入 redis。我已经通过执行 println 来让消费者部分工作了一条一条地发送消息,直到它到达偏移量的末尾,但是当我尝试添加一个 channel 时,它似乎命中了 default: switch 中的案例然后就卡住了。

我似乎也没有正确填写 channel ?这fmt.Println("count is: ", len(redisChnl))总是打印 0

这是我目前所拥有的:

// Example function-based high-level Apache Kafka consumer
package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "os/signal"
    "syscall"
    "time"
    "encoding/json"
    "regexp"
    "github.com/go-redis/redis"
    "encoding/binary"
)

var client *redis.Client

func init() {
    client = redis.NewClient(&redis.Options{
        Addr:         ":6379",
        DialTimeout:  10 * time.Second,
        ReadTimeout:  30 * time.Second,
        WriteTimeout: 30 * time.Second,
        PoolSize:     10,
        PoolTimeout:  30 * time.Second,
    })
    client.FlushDB()
}

type MessageFormat struct {
    MetricValueNumber float64     `json:"metric_value_number"`
    Path              string      `json:"path"`
    Cluster           string      `json:"cluster"`
    Timestamp         time.Time   `json:"@timestamp"`
    Version           string      `json:"@version"`
    Host              string      `json:"host"`
    MetricPath        string      `json:"metric_path"`
    Type              string      `json:"string"`
    Region            string      `json:"region"`
}

//func redis_pipeline(ky string, vl string) {
//  pipe := client.Pipeline()
//
//  exec := pipe.Set(ky, vl, time.Hour)
//
//  incr := pipe.Incr("pipeline_counter")
//  pipe.Expire("pipeline_counter", time.Hour)
//
//  // Execute
//  //
//  //     INCR pipeline_counter
//  //     EXPIRE pipeline_counts 3600
//  //
//  // using one client-server roundtrip.
//  _, err := pipe.Exec()
//  fmt.Println(incr.Val(), err)
//  // Output: 1 <nil>
//}

func main() {


    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":               "kafka.com:9093",
        "group.id":                        "testehb",
        "security.protocol":               "ssl",
        "ssl.key.location":                "/Users/key.key",
        "ssl.certificate.location":        "/Users/cert.cert",
        "ssl.ca.location":                 "/Users/ca.pem",
    })

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v\n", c)

    err = c.SubscribeTopics([]string{"jmx"}, nil)

    redisMap := make(map[string]string)

    redisChnl := make(chan []byte, 2000)

    run := true

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:

                //fmt.Printf("%% Message on %s:\n%s\n",
                //  e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }

                str := e.Value
                res := MessageFormat{}
                json.Unmarshal([]byte(str), &res)


                fmt.Println("size", binary.Size([]byte(str)))

                host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

                redisMap[host] = string(str)
                fmt.Println("count is: ", len(redisChnl)) //this always prints "count is:  0"

                redisChnl <- e.Value //I think this is the write way to put the messages in the channel?

            case kafka.PartitionEOF:
                fmt.Printf("%% Reached %v\n", e)
            case kafka.Error:
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
                run = false
            default:
                fmt.Printf("Ignored %v\n", e)
            }

            <- redisChnl // I thought I could just empty the channel like this once the buffer is full?


        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}

--------编辑--------

好的,我想我是通过移动 <- redisChnl 让它工作的里面default , 但现在我看到 count before readcount after readdefault里面总是打印 2,000 ...我本以为第一个count before read = 2,000然后 count after read = 0因为那时 channel 将是空的??

    select {
    case sig := <-sigchan:
        fmt.Printf("Caught signal %v: terminating\n", sig)
        run = false
    default:
        ev := c.Poll(100)
        if ev == nil {
            continue
        }

        switch e := ev.(type) {
        case *kafka.Message:

            //fmt.Printf("%% Message on %s:\n%s\n",
            //  e.TopicPartition, string(e.Value))
            if e.Headers != nil {
                fmt.Printf("%% Headers: %v\n", e.Headers)
            }

            str := e.Value
            res := MessageFormat{}
            json.Unmarshal([]byte(str), &res)


            //fmt.Println("size", binary.Size([]byte(str)))

            host:= regexp.MustCompile(`^([^.]+)`).FindString(res.MetricPath)

            redisMap[host] = string(str)

            go func() {
                redisChnl <- e.Value
            }()


        case kafka.PartitionEOF:
            fmt.Printf("%% Reached %v\n", e)
        case kafka.Error:
            fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            run = false
        default:
            fmt.Println("count before read: ", len(redisChnl))

            fmt.Printf("Ignored %v\n", e)

            <-redisChnl

            fmt.Println("count after read: ", len(redisChnl)) //would've expected this to be 0

        }


    }

最佳答案

我认为简化此代码的更大方法是将管道分成多个 goroutine。

channel 的优点是多人可以同时在上面书写和阅读。在这个例子中,这可能意味着有一个 go 例程在 channel 上排队,另一个从 channel 中取出并将东西放入 redis。

像这样:

c := make(chan Message, bufferLen)
go pollKafka(c)
go pushToRedis(c)

如果你想添加批处理,你可以添加一个从 kafka channel 轮询的中间阶段,并附加到一个 slice 直到 slice 已满,然后将该 slice 排入 redis 的 channel 。

如果这样的并发性不是目标,那么用 slice 替换代码中的 channel 可能会更容易。如果只有 1 个 goroutine 作用于一个对象,那么尝试使用 channel 并不是一个好主意。

关于与卡夫卡消费者一起去 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50301067/

有关与卡夫卡消费者一起去 channel的更多相关文章

  1. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  2. ruby-on-rails - 如果我将 ruby​​ 版本 2.5.1 与 rails 版本 2.3.18 一起使用会怎样? - 2

    如果我使用ruby​​版本2.5.1和Rails版本2.3.18会怎样?我有基于rails2.3.18和ruby​​1.9.2p320构建的rails应用程序,我只想升级ruby的版本,而不是rails,这可能吗?我必须面对哪些挑战? 最佳答案 GitHub维护apublicfork它有针对旧Rails版本的分支,有各种变化,它们一直在运行。有一段时间,他们在较新的Ruby版本上运行较旧的Rails版本,而不是最初支持的版本,因此您可能会发现一些关于需要向后移植的有用提示。不过,他们现在已经有几年没有使用2.3了,所以充其量只能让更

  3. ruby-on-rails - 如何让 datamapper 与 postgresql 数据库一起工作? - 2

    我已经找到了几个使用datamapper的示例,并且能够让它们正常工作。不过,所有这些示例都是针对sqlite数据库的。我正在尝试将数据映射器与postgresql一起使用。我将datamapper中的调用从sqlite3更改为postgres,并且我已经安装了dm-postgres-adapter。但它仍然不起作用。我还需要做什么? 最佳答案 与SQLite不同,PostgreSQL不将数据库存储在单个文件中。在你拥有createdyourdatabase之后,尝试这样的事情:DataMapper.setup:default,{:

  4. ruby - 了解在 Ruby 中与 lambda 一起使用的 inject 行为 - 2

    我经常将预配置的lambda插入可枚举的方法中,例如“map”、“select”等。但是“注入(inject)”的行为似乎有所不同。例如与mult4=lambda{|item|item*4}然后(5..10).map&mult4给我[20,24,28,32,36,40]但是,如果我制作一个2参数lambda用于像这样的注入(inject),multL=lambda{|product,n|product*n}我想说(5..10).inject(2)&multL因为“inject”有一个可选的单个初始值参数,但这给了我......irb(main):027:0>(5..10).inject

  5. ruby-on-rails - 与 ActiveMerchant 一起使用的最佳支付网关是什么? - 2

    我需要使用ActiveMerchant库在我们的一个Rails应用程序中设置支付解决方案。尽管这个问题非常主观,但人们对主要网关(BrainTree、Authorize.net等)的体验如何?它必须:处理定期付款。有能力记入个人帐户。能够取消付款。有办法存储用户的付款详细信息(例如Authotize.netsCIM)。干杯 最佳答案 ActiveMerchant很棒,但在过去一年左右的时间里,我在使用它时发现了一些问题。首先,虽然某些网关可能会得到“支持”——但并非所有功能都包含在内。查看功能矩阵以确保完全支持您选择的网关-http

  6. ruby-on-rails - 将 acts_as_list 与 has_many 一起使用 :through in rails - 2

    我有一个Rails应用程序,我正在尝试使用acts_as_list插件设置可排序列表。数据库中的位置字段正在更新,但是在呈现页面时,不考虑顺序。我想我是在寻求帮助。这是我的模型...classQuestionMembership:question_membershipsendclassQuestion:question_membershipsacts_as_listend还有给我列表的草率View代码...>true)%>拖放用于重新排序。数据库中QuestionMembership对象的位置值更新,页面实际上正确显示重新排序。问题是在页面重新加载时,它默认返回到它感觉的任何顺序。我认

  7. ruby - 将 each_with_index 与 map 一起使用 - 2

    我想获取一个数组并将其作为订单列表。目前我正在尝试以这种方式进行:r=["a","b","c"]r.each_with_index{|w,index|puts"#{index+1}.#{w}"}.map.to_a#1.a#2.b#3.c#=>["a","b","c"]输出应该是["1.a","2.b","3.c"]。如何让正确的输出成为r数组的新值? 最佳答案 a.to_enum.with_index(1).map{|element,index|"#{index}.#{element}"}或a.map.with_index(1){|

  8. ruby-on-rails - 将 restclient 与多部分帖子一起使用 - 2

    我将restclient用于多部分表单,以将数据发送到restfulweb服务(它是Panda视频编码服务)。不过,诀窍在于我传递给restclient(Technoweenie分支)的文件来自用户提交的我自己的表单。那么,让我们来看看这个。用户将文件发布到我的Rails应用程序。在我的Controller中,它从params[:file]接收文件。然后我想使用RestClient将params[:file]传递给Panda。我在Panda服务器上遇到的错误如下。我注意到堆栈跟踪中的文件参数也在一个字符串中(我假设Panda将其转换为字符串以获得更好的堆栈跟踪)。~Startedreq

  9. ruby-on-rails - 我将 Rails3 与 tinymce 一起使用。如何呈现用户关闭浏览器javascript然后输入xss? - 2

    我有一个用Rails3编写的站点。我的帖子模型有一个名为“内容”的文本列。在帖子面板中,html表单使用tinymce将“content”列设置为textarea字段。在首页,因为使用了tinymce,post.html.erb的代码需要用这样的原始方法来实现。.好的,现在如果我关闭浏览器javascript,这个文本区域可以在没有tinymce的情况下输入,也许用户会输入任何xss,比如alert('xss');.我的前台会显示那个警告框。我尝试sanitize(@post.content)在posts_controller中,但sanitize方法将相互过滤tinymce样式。例如

  10. ruby - 将 Launchd 与 Mavericks 和 Ruby 一起使用 - 2

    升级到Mavericks,现在启动/Lingon无法启动Ruby脚本。我将文件设置为可执行文件(使用chmod+x),并将第一行设置为#!/usr/bin/ruby但是,我在控制台中不断收到以下错误:com.apple.launchd.peruser.501[169](craig.logging[754]):Exitedwithcode:1这是一个权限错误,但我不知道要修复或更改什么权限。该脚本在带有ruby​​的终端中运行良好。这让我抓狂。更新:导致问题的Ruby脚本将它们的输出写入另一个文件,例如,在我的Dropbox中。但是我以我自己的身份运行launchd文件,我自己拥有写入这

随机推荐