草庐IT

go - AMQP Golang 优先级不起作用

coder 2023-07-01 原文

我目前正在使用 golang (github.com/streadway/amqp) 测试 rabbitmq,我有三个程序,其中两个以不同的优先级向队列发送消息,一个从队列读取。
我遇到的问题是,在用这两个程序发送了一些消息之后,我继续启动将从队列中读取的程序,当它开始从队列中读取时,它会像 FIFO 一样输出结果。
我希望先输出高优先级的消息,然后再输出低优先级的消息。
是我误解了 rabbitmq 的工作原理还是我做错了什么?

发送到队列 包主

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        body := "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "high"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 9,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")


        body = "low"
        err = ch.Publish(
                "",     // exchange                                                                                                                                                                                 
                q.Name, // routing key                                                                                                                                                                              
                false,  // mandatory                                                                                                                                                                                
                false,  // immediate                                                                                                                                                                                
                amqp.Publishing{
                        Headers:         amqp.Table{},
                        ContentType:     "text/plain",
                        ContentEncoding: "",
                        Body:            []byte(body),
                        DeliveryMode:    amqp.Transient, // 1=non-persistent, 2=persistent                                                                                                                          
                        Priority: 0,
                })
        log.Printf(" [x] Sent %s", body)
        failOnError(err, "Failed to publish a message")

}

阅读队列:

package main

import (
        "log"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        args := make(amqp.Table)
        args["x-max-priority"] = int64(9)

        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")

        q, err := ch.QueueDeclare(
                "test", // name                                                                                                                                                                                     
                false,   // durable                                                                                                                                                                                 
                false,   // delete when unused                                                                                                                                                                      
                false,   // exclusive                                                                                                                                                                               
                false,   // no-wait                                                                                                                                                                                 
                args,     // arguments                                                                                                                                                                              
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue                                                                                                                                                                                    
                "",     // consumer                                                                                                                                                                                 
                true,   // auto-ack                                                                                                                                                                                 
                false,  // exclusive                                                                                                                                                                                
                false,  // no-local                                                                                                                                                                                 
                false,  // no-wait                                                                                                                                                                                  
                args,    // args                                                                                                                                                                                    
        )
        failOnError(err, "Failed to register a consumer")

        defer ch.Close()
        defer conn.Close()

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        log.Printf("Received a message: %s", d.Body)
                }
        }()

        log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
        <-forever
}

首先,我启动代码,将消息发送到以下值的队列:

low 消息的优先级为 0,high 消息的优先级为 9。
然后我启动将接收队列的程序。
预期输出:

实际输出:

谢谢

最佳答案

这是我可以从您的代码以及您所遵循的步骤的描述中得出的结论。

问题似乎是您没有在每段代码中以相同的方式创建队列,低优先级和高优先级生产者在没有 x-max-priority 参数的情况下创建队列。

由于队列在创建时被定义为优先队列,只有在那个时候,您才真正创建了一个“标准”队列。

您应该能够通过检查创建的队列及其参数轻松确认这一点。

关于go - AMQP Golang 优先级不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47872193/

有关go - AMQP Golang 优先级不起作用的更多相关文章

  1. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  2. ruby-on-rails - "assigns"在 Ruby on Rails 中有什么作用? - 2

    我目前正在尝试学习RubyonRails和测试框架RSpec。assigns在此RSpec测试中做什么?describe"GETindex"doit"assignsallmymodelas@mymodel"domymodel=Factory(:mymodel)get:indexassigns(:mymodels).shouldeq([mymodel])endend 最佳答案 assigns只是检查您在Controller中设置的实例变量的值。这里检查@mymodels。 关于ruby-o

  3. ruby - 字符串文字前面的 * 在 ruby​​ 中有什么作用? - 2

    这段代码似乎创建了一个范围从a到z的数组,但我不明白*的作用。有人可以解释一下吗?[*"a".."z"] 最佳答案 它叫做splatoperator.SplattinganLvalueAmaximumofonelvaluemaybesplattedinwhichcaseitisassignedanArrayconsistingoftheremainingrvaluesthatlackcorrespondinglvalues.Iftherightmostlvalueissplattedthenitconsumesallrvaluesw

  4. ruby - 为什么这个 eval 在 Ruby 中不起作用 - 2

    你能解释一下吗?我想评估来自两个不同来源的值和计算。一个消息来源为我提供了以下信息(以编程方式):'a=2'第二个来源给了我这个表达式来评估:'a+3'这个有效:a=2eval'a+3'这也有效:eval'a=2;a+3'但我真正需要的是这个,但它不起作用:eval'a=2'eval'a+3'我想了解其中的区别,以及如何使最后一个选项起作用。感谢您的帮助。 最佳答案 您可以创建一个Binding,并将相同的绑定(bind)与每个eval相关联调用:1.9.3p194:008>b=binding=>#1.9.3p194:009>eva

  5. ruby-on-rails - Spring 不起作用。 [未初始化常量 Spring::SID::DL] - 2

    我无法运行Spring。这是错误日志。myid-no-MacBook-Pro:myid$spring/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/lib/spring/sid.rb:17:in`fiddle_func':uninitializedconstantSpring::SID::DL(NameError)from/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/li

  6. ruby-on-rails - Simple_form 必填字段不起作用 - Ruby on Rails - 2

    我在RoR应用程序中有一个提交表单,是使用simple_form构建的。当字段为空白时,应用程序仍会继续下一步,而不会提示错误或警告。默认情况下,这些字段应该是required:true;但即使手动编写也行不通。该应用有3个步骤:NewPost(新View)->Preview(创建View)->Post。我的Controller和View的摘录会更清楚:defnew@post=Post.newenddefcreate@post=Post.new(params.require(:post).permit(:title,:category_id))ifparams[:previewButt

  7. ruby-on-rails - Textmate 'Go to symbol' 相当于 Vim - 2

    在Railcasts上,我注意到一个非常有趣的功能“转到符号”窗口。它像Command-T一样工作,但显示当前文件中可用的类和方法。如何在vim中获取它? 最佳答案 尝试:helptags有各种程序和脚本可以生成标记文件。此外,标记文件格式非常简单,因此很容易将sed(1)或类似的脚本组合在一起,无论您使用何种语言,它们都可以生成标记文件。轻松获取标记文件(除了下载生成器之外)的关键在于格式化样式而不是实际解析语法。 关于ruby-on-rails-Textmate'Gotosymbol

  8. ruby-on-rails - Heroku Action 缓存似乎不起作用 - 2

    我一直在Heroku上尝试不同的缓存策略,并添加了他们的memcached附加组件,目的是为我的应用程序添加Action缓存。但是,当我在我当前的应用程序上查看Rails.cache.stats时(安装了memcached并使用dalligem),在执行应该缓存的操作后,我得到current和total_items为0。在Controller的顶部,我想缓存我有的Action:caches_action:show此外,我修改了我的环境配置(对于在Heroku上运行的配置)config.cache_store=:dalli_store我是否可以查看其他一些统计数据,看看它是否有效或我做错

  9. ruby-on-rails - Rake 预览在 Octopress 中不起作用 - 2

    我在我的机器上安装了ruby​​版本1.9.3,并且正在为我的个人网站开发一个octopress项目。我为我的gems使用了rvm,并遵循了octopress.org记录的所有步骤。但是我在我的rake服务器中发现了一些错误。这是我的命令日志。Tin-Aung-Linn:octopresstal$ruby--versionruby1.9.3p448(2013-06-27revision41675)[x86_64-darwin12.4.0]Tin-Aung-Linn:octopresstal$rakegenerate##GeneratingSitewithJekyllidenticals

  10. ruby - 了解 Ruby 中赋值和逻辑运算符的优先级 - 2

    在以下示例中,我无法理解Ruby运算符的优先级:x=1&&y=2由于&&的优先级高于=,我的理解是类似于+和*运算符:1+2*3+4解析为1+(2*3)+4它应该等于:x=(1&&y)=2但是,所有Ruby源代码(包括内部语法解析器Ripper)都将其解析为x=(1&&(y=2))为什么?编辑[08.01.2016]让我们关注一个子表达式:1&&y=2根据优先规则,我们应该尝试将其解析为:(1&&y)=2这没有意义,因为=需要特定的LHS(变量、常量、[]数组项等)。但是既然(1&&y)是一个正确的表达式,那么解析器应该如何处理呢?我试过咨询Ruby的parse.y,但它太像意大利面条

随机推荐