草庐IT

go - RabbitMQ 消费者性能 - 预取与并发

coder 2024-07-08 原文

我有一个 Go 应用程序处理来自单个 RabbitMQ 队列的事件。我用 github.com/streadway/amqp RabbitMQ 客户端库。

Go 应用程序在大约 2-3 秒内处理每条消息。如果我从内存中输入消息,则可以并行处理 ~1000 条甚至更多消息。 但是,不幸的是,RabbitMQ 的性能更差。 所以,我想更快地使用队列中的消息。

因此,问题是:如何使用 github.com/streadway/amqp 以最有效的方式使用消息?

据我了解,有两种方法:

  1. 设置高预取

    https://godoc.org/github.com/streadway/amqp#Channel.Qos .

    使用单个消费者协程

    示例代码:

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

ch.Qos(
        10000,           // prefetch count
        0,               // prefetch size
        false,           // global
    )

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // NO auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)

for d := range msgs {
  log.Printf("Received a message: %s", d.Body)
  err:= processMessage(d)
  if err != nil {
      log.Printf("%s : while consuming task", err)
      d.Nack(false, true)
  } else {
      d.Ack(false)
  }
  continue // consume other messages
}

但是 processMessage 会在这里并行调用吗?

  1. 产生许多 channel 并使用多个消费者
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
var i = 0
for i = 0; i<=100; i++ {
  go func(){
      ch, err := conn.Channel()
      failOnError(err, "Failed to open a channel")
      defer ch.Close()

      ch.Qos(
            10,           // prefetch count
            0,               // prefetch size
            false,           // global
    )

      msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // NO auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
      )

      for d := range msgs {
        log.Printf("Received a message: %s", d.Body)
        err:= processMessage(d)
        if err != nil {
            log.Printf("%s : while consuming task", err)
            d.Nack(false, true)
        } else {
            d.Ack(false)
        }
        continue // consume other messages
      }
  }()
}

但这是对 RAM 友好的方法吗?对 RabbitMQ 而言,为每个工作人员生成一个新 channel 不是很引人注目吗?

那么,问题是,哪种变体更好?更好的性能、更好的内存使用等。

那么,在这里 RabbitMQ 的最佳用法是什么?

更新:目前,我遇到了一个情况,我的工作人员在 VPS 上消耗了所有 RAM,并被 OOM 杀死。我使用了第二种方法。因此,对我来说更好的是能够让我的工作人员在工作几分钟后不会被 OOM 杀死。

更新2:当worker处理消息失败时nack,当worker处理消息时ack非常重要。所有消息都必须处理(它的客户分析),但有时工作人员无法处理它,因此它必须 nack 消息将其传递给其他工作人员(目前,一些第 3 方 api 有时用于处理消息简单地返回 503 状态代码,在这种情况下,消息应该传递给其他工作人员或重试)。 因此,很遗憾,使用auto-ack 不是一种选择。

最佳答案

我想每个 processMessage() 都在一个新的 goroutine 中运行。

Which variant is better?

我更喜欢第一个,因为打开/关闭 channel 有点昂贵(2 + 2 TCP 数据包)。我认为你的 OOM 问题与太多的 gorutine 无关,gorutine 很轻,大约 5KB。所以问题可能是由您的 processMessage() 引起的。

我认为 github.com/streadway/amqp channel 消费操作是 thread/gorutine-safe ,所以如果你只是做一些消费操作,那么在 goruntine 之间共享 channel 是安全的。

关于go - RabbitMQ 消费者性能 - 预取与并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57375790/

有关go - RabbitMQ 消费者性能 - 预取与并发的更多相关文章

  1. Ruby 的数字方法性能 - 2

    我正在使用Ruby解决一些ProjectEuler问题,特别是这里我要讨论的问题25(Fibonacci数列中包含1000位数字的第一项的索引是多少?)。起初,我使用的是Ruby2.2.3,我将问题编码为:number=3a=1b=2whileb.to_s.length但后来我发现2.4.2版本有一个名为digits的方法,这正是我需要的。我转换为代码:whileb.digits.length当我比较这两种方法时,digits慢得多。时间./025/problem025.rb0.13s用户0.02s系统80%cpu0.190总计./025/problem025.rb2.19s用户0.0

  2. ruby - Ruby 性能中的计时器 - 2

    我正在寻找一个用ruby​​演示计时器的在线示例,并发现了下面的代码。它按预期工作,但这个简单的程序使用30Mo内存(如Windows任务管理器中所示)和太多CPU有意义吗?非常感谢deftime_blockstart_time=Time.nowThread.new{yield}Time.now-start_timeenddefrepeat_every(seconds)whiletruedotime_spent=time_block{yield}#Tohandle-vesleepinteravalsleep(seconds-time_spent)iftime_spent

  3. ruby-on-rails - 获取并发布相同匹配项的请求 - 2

    在我的路线文件中我有:match'graphs/(:id(/:action))'=>'graphs#(:action)'如果是GET请求(工作)或POST请求(不工作),我想匹配它我知道我可以使用以下方法在资源中声明POST请求:post'/'=>:show,:on=>:member但是我怎样才能为比赛做到这一点呢?谢谢。 最佳答案 如果你同时想要POST和GETmatch'graphs/(:id(/:action))'=>'graphs#(:action)',:via=>[:get,:post]编辑默认值可以设置如下match'g

  4. ruby-on-rails - 如果条件与 &&,是否有任何性能提升 - 2

    如果用户是所有者,我有一个条件来检查说删除和文章。delete_articleifuser.owner?另一种方式是user.owner?&&delete_article选择它有什么好处还是它只是一种写作风格 最佳答案 性能不太可能成为该声明的问题。第一个要好得多-它更容易阅读。您future的自己和其他将开始编写代码的人会为此感谢您。 关于ruby-on-rails-如果条件与&&,是否有任何性能提升,我们在StackOverflow上找到一个类似的问题:

  5. ruby - 如何找到我的 Ruby 应用程序中的性能瓶颈? - 2

    我编写了一个Ruby应用程序,它可以解析来自不同格式html、xml和csv文件的源中的大量数据。我如何找出代码的哪些区域花费的时间最长?有没有关于如何提高Ruby应用程序性能的好资源?或者您是否有任何始终遵循的性能编码标准?例如,你总是用加入你的字符串吗?output=String.newoutput或者你会使用output="#{part_one}#{part_two}\n" 最佳答案 好吧,有一些众所周知的做法,例如字符串连接比“#{value}”慢得多,但是为了找出您的脚本在哪里消耗了大部分时间或比所需时间更多,您需要进行分

  6. ruby-on-rails - Textmate 'Go to symbol' 相当于 Vim - 2

    在Railcasts上,我注意到一个非常有趣的功能“转到符号”窗口。它像Command-T一样工作,但显示当前文件中可用的类和方法。如何在vim中获取它? 最佳答案 尝试:helptags有各种程序和脚本可以生成标记文件。此外,标记文件格式非常简单,因此很容易将sed(1)或类似的脚本组合在一起,无论您使用何种语言,它们都可以生成标记文件。轻松获取标记文件(除了下载生成器之外)的关键在于格式化样式而不是实际解析语法。 关于ruby-on-rails-Textmate'Gotosymbol

  7. STM32的HAL和LL库区别和性能对比 - 2

    LL库和HAL库简介LL:Low-Layer,底层库HAL:HardwareAbstractionLayer,硬件抽象层库LL库和hal库对比,很精简,这实际上是一个精简的库。LL库的配置选择如下:在STM32CUBEMX中,点击菜单的“ProjectManager”–>“AdvancedSettings”,在下面的界面中选择“AdvancedSettings”,然后在每个模块后面选择使用的库总结:1、如果使用的MCU是小容量的,那么STM32CubeLL将是最佳选择;2、如果结合可移植性和优化,使用STM32CubeHAL并使用特定的优化实现替换一些调用,可保持最大的可移植性。另外HAL和L

  8. ruby - GC.disable 的任何性能缺点? - 2

    是否存在GC.disable会降低性能的情况?只要我使用的是真正的RAM而不是交换内存,就可以这样做吗?我正在使用MRIRuby2.0,据我所知,它是64位的,并且使用的是64位的Ubuntu:ruby2.0.0p0(2013-02-24revision39474)[x86_64-linux]Linux[redacted]3.2.0-43-generic#68-UbuntuSMPWedMay1503:33:33UTC2013x86_64x86_64x86_64GNU/Linux 最佳答案 GC.disable将禁用垃圾回收。像rub

  9. ruby - 在 Ruby 中实现生产者消费者模式 - 2

    假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来

  10. ruby-on-rails - Rails with angular 与 Rails pure(查看性能) - 2

    我尝试在Internet上搜索有关使用angularJS进入RubyonRails项目与RubyonRailspure的View性能的信息。我的问题是因为2个月前我开始使用纯AngularJS,现在我需要将AngularJS集成到一个新项目中,但需要展示使用带有RubyonRails的AngularJS呈现View的性能如何,并消除对RubyonRails的负担.例如:带Rails的Angular:使用RubyonRails获取数据(从数据库或GET请求),将信息发送到file.js.erb并使用AngularJS操作数据并显示带有解析数据的View。纯粹的Rails:(自然流程)使用

随机推荐