我目前正在使用 golang (github.com/streadway/amqp) 测试 rabbitmq,我有三个程序,其中两个以不同的优先级向队列发送消息,一个从队列读取。
我遇到的问题是,在用这两个程序发送了一些消息之后,我继续启动将从队列中读取的程序,当它开始从队列中读取时,它会像 FIFO 一样输出结果。
我希望先输出高优先级的消息,然后再输出低优先级的消息。
是我误解了 rabbitmq 的工作原理还是我做错了什么?
发送到队列 包主
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
args := make(amqp.Table)
args["x-max-priority"] = int64(9)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"test", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
body := "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "high"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 9,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
body = "low"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: "text/plain",
ContentEncoding: "",
Body: []byte(body),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0,
})
log.Printf(" [x] Sent %s", body)
failOnError(err, "Failed to publish a message")
}
阅读队列:
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
args := make(amqp.Table)
args["x-max-priority"] = int64(9)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
"test", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
args, // args
)
failOnError(err, "Failed to register a consumer")
defer ch.Close()
defer conn.Close()
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
首先,我启动代码,将消息发送到以下值的队列:
low 消息的优先级为 0,high 消息的优先级为 9。
然后我启动将接收队列的程序。
预期输出:
实际输出:
谢谢
最佳答案
这是我可以从您的代码以及您所遵循的步骤的描述中得出的结论。
问题似乎是您没有在每段代码中以相同的方式创建队列,低优先级和高优先级生产者在没有 x-max-priority 参数的情况下创建队列。
由于队列在创建时被定义为优先队列,只有在那个时候,您才真正创建了一个“标准”队列。
您应该能够通过检查创建的队列及其参数轻松确认这一点。
关于go - AMQP Golang 优先级不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47872193/
如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象
我目前正在尝试学习RubyonRails和测试框架RSpec。assigns在此RSpec测试中做什么?describe"GETindex"doit"assignsallmymodelas@mymodel"domymodel=Factory(:mymodel)get:indexassigns(:mymodels).shouldeq([mymodel])endend 最佳答案 assigns只是检查您在Controller中设置的实例变量的值。这里检查@mymodels。 关于ruby-o
这段代码似乎创建了一个范围从a到z的数组,但我不明白*的作用。有人可以解释一下吗?[*"a".."z"] 最佳答案 它叫做splatoperator.SplattinganLvalueAmaximumofonelvaluemaybesplattedinwhichcaseitisassignedanArrayconsistingoftheremainingrvaluesthatlackcorrespondinglvalues.Iftherightmostlvalueissplattedthenitconsumesallrvaluesw
你能解释一下吗?我想评估来自两个不同来源的值和计算。一个消息来源为我提供了以下信息(以编程方式):'a=2'第二个来源给了我这个表达式来评估:'a+3'这个有效:a=2eval'a+3'这也有效:eval'a=2;a+3'但我真正需要的是这个,但它不起作用:eval'a=2'eval'a+3'我想了解其中的区别,以及如何使最后一个选项起作用。感谢您的帮助。 最佳答案 您可以创建一个Binding,并将相同的绑定(bind)与每个eval相关联调用:1.9.3p194:008>b=binding=>#1.9.3p194:009>eva
我无法运行Spring。这是错误日志。myid-no-MacBook-Pro:myid$spring/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/lib/spring/sid.rb:17:in`fiddle_func':uninitializedconstantSpring::SID::DL(NameError)from/Users/myid/.rbenv/versions/1.9.3-p484/lib/ruby/gems/1.9.1/gems/spring-0.0.10/li
我在RoR应用程序中有一个提交表单,是使用simple_form构建的。当字段为空白时,应用程序仍会继续下一步,而不会提示错误或警告。默认情况下,这些字段应该是required:true;但即使手动编写也行不通。该应用有3个步骤:NewPost(新View)->Preview(创建View)->Post。我的Controller和View的摘录会更清楚:defnew@post=Post.newenddefcreate@post=Post.new(params.require(:post).permit(:title,:category_id))ifparams[:previewButt
在Railcasts上,我注意到一个非常有趣的功能“转到符号”窗口。它像Command-T一样工作,但显示当前文件中可用的类和方法。如何在vim中获取它? 最佳答案 尝试:helptags有各种程序和脚本可以生成标记文件。此外,标记文件格式非常简单,因此很容易将sed(1)或类似的脚本组合在一起,无论您使用何种语言,它们都可以生成标记文件。轻松获取标记文件(除了下载生成器之外)的关键在于格式化样式而不是实际解析语法。 关于ruby-on-rails-Textmate'Gotosymbol
我一直在Heroku上尝试不同的缓存策略,并添加了他们的memcached附加组件,目的是为我的应用程序添加Action缓存。但是,当我在我当前的应用程序上查看Rails.cache.stats时(安装了memcached并使用dalligem),在执行应该缓存的操作后,我得到current和total_items为0。在Controller的顶部,我想缓存我有的Action:caches_action:show此外,我修改了我的环境配置(对于在Heroku上运行的配置)config.cache_store=:dalli_store我是否可以查看其他一些统计数据,看看它是否有效或我做错
我在我的机器上安装了ruby版本1.9.3,并且正在为我的个人网站开发一个octopress项目。我为我的gems使用了rvm,并遵循了octopress.org记录的所有步骤。但是我在我的rake服务器中发现了一些错误。这是我的命令日志。Tin-Aung-Linn:octopresstal$ruby--versionruby1.9.3p448(2013-06-27revision41675)[x86_64-darwin12.4.0]Tin-Aung-Linn:octopresstal$rakegenerate##GeneratingSitewithJekyllidenticals
在以下示例中,我无法理解Ruby运算符的优先级:x=1&&y=2由于&&的优先级高于=,我的理解是类似于+和*运算符:1+2*3+4解析为1+(2*3)+4它应该等于:x=(1&&y)=2但是,所有Ruby源代码(包括内部语法解析器Ripper)都将其解析为x=(1&&(y=2))为什么?编辑[08.01.2016]让我们关注一个子表达式:1&&y=2根据优先规则,我们应该尝试将其解析为:(1&&y)=2这没有意义,因为=需要特定的LHS(变量、常量、[]数组项等)。但是既然(1&&y)是一个正确的表达式,那么解析器应该如何处理呢?我试过咨询Ruby的parse.y,但它太像意大利面条