我正在尝试找到一种从输入队列异步使用、使用多个工作程序处理内容然后发布到输出队列的好方法。到目前为止,我已经尝试了很多示例,最近使用的代码来自 here和 here作为灵感。
然而,我当前的代码似乎没有做它应该做的事情,增加工作人员的数量不会提高性能(消耗或发布的 msg/s),并且 goroutines 的数量在运行时保持相当稳定。
主要内容:
func main() {
maxWorkers := 10
// channel for jobs
in := make(chan []byte)
out := make(chan []byte)
// start workers
wg := &sync.WaitGroup{}
wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
log.Println(i)
defer wg.Done()
go processor(in, out)
}
// add jobs
go collector(in)
go sender(out)
// wait for workers to complete
wg.Wait()
}
收集器基本上是来自 RabbitMQ 站点的示例,带有一个 goroutine,它从队列中收集消息并将它们放在“in” channel 上:
forever := make(chan bool)
go func() {
for d := range msgs {
in <- d.Body
d.Ack(false)
}
}()
log.Printf("[*] Waiting for messages. To exit press CTRL+C")
<-forever
处理器接收“输入”和“输出” channel ,解码 JSON,执行一系列正则表达式,然后将输出放入“输出” channel :
func processor(in chan []byte, out chan []byte) {
var (
// list of regexes declared here
)
for {
body := <-in
jsonIn := &Data{}
err := json.Unmarshal(body, jsonIn)
if err != nil {
log.Fatalln("Failed to decode:", err)
}
content := jsonIn.Content
//process regexes using:
//jsonIn.a = r1.FindAllString(content, -1)
jsonOut, _ := json.Marshal(jsonIn)
out <- jsonOut
}
}
最后,发送方只是来自 RabbitMQ 站点的代码,设置连接,从“输出” channel 读取数据,然后发布到 RMQ 队列:
for {
jsonOut := <-out
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/json",
Body: []byte(jsonOut),
})
failOnError(err, "Failed to publish a message")
}
这是一个我会经常使用的模式,所以我花了很多时间试图找到正确(并且很好)工作的东西 - 任何建议或帮助将不胜感激(如果它并不明显,我是 Go 的新手)。
最佳答案
有几件事跳出来了:
在主函数中完成
wg.Add(maxWorkers)
for i := 1; i <= maxWorkers; i++ {
log.Println(i)
defer wg.Done()
go processor(in, out)
}
此处的 defer 是在 main 返回时执行的,因此它实际上并不表示处理何时完成。不过,我认为这不会对您的程序的性能概况产生影响。
要解决这个问题,您可以将 wg *sync.WaitGroup 传递给您的处理器,以便您的处理器可以指示它何时完成。
CPU 绑定(bind)处理
解析消息和执行 Regex 是 CPU 密集型工作负载。你的机器是几核的?如果您在两台不同的机器上运行您的程序,吞吐量会受到怎样的影响,吞吐量是 2 倍吗?如果将内核数量加倍怎么办?用 1 个 worker 和 2 个处理器 worker 运行你的程序怎么样?这会使吞吐量翻倍吗?你的 rabbitmq 本地实例用完了吗?是瓶颈吗??
设置基准测试和负载测试工具应该允许您设置实验以查看瓶颈所在:)
对于基于队列的服务,很容易设置一个测试工具来为 rabbitmq 填充一组积压和基准测试你可以多快地处理这些消息,或者设置一个负载生成器以每秒发送 x 条消息到 rabbitmq 并观察你是否可以跟上。
rabbitmq 对消息处理吞吐量有很好的可见性吗?如果不是,我经常添加一个计数器来执行代码,然后记录一个时间间隔的总体平均吞吐量,以大致了解性能:
start := time.Now()
updateInterval := time.Tick(1 * time.Second)
numIn := 0
for {
select {
case <-updateInterval:
log.Infof("IN - Count: %d", numIn)
log.Infof("IN - Througput: %.0f events/second",
float64(numIn)/(time.Now().Sub(start)).Seconds())
case e := <-msgs:
numIn++
in <- d.Body
d.Ack(false)
}
}
关于go - RabbitMQ 多 worker 模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46448741/
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende
给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最
了解Rails缓存如何工作的人可以真正帮助我。这是嵌套在Rails::Initializer.runblock中的代码:config.after_initializedoSomeClass.const_set'SOME_CONST','SOME_VAL'end现在,如果我运行script/server并发出请求,一切都很好。然而,在我的Rails应用程序的第二个请求中,一切都因单元化常量错误而变得糟糕。在生产模式下,我可以成功发出第二个请求,这意味着常量仍然存在。我已通过将以上内容更改为以下内容来解决问题:config.after_initializedorequire'some_cl
我经常迷上ruby的一件事是递归模式。例如,假设我有一个数组,它可能包含无限深度的数组作为元素。所以,例如:my_array=[1,[2,3,[4,5,[6,7]]]]我想创建一个方法,可以将数组展平为[1,2,3,4,5,6,7]。我知道.flatten可以完成这项工作,但这个问题是作为我经常遇到的递归问题的一个例子-因此我试图找到一个更可重用的解决方案。简而言之-我猜这种事情有一个标准模式,但我想不出任何特别优雅的东西。任何想法表示赞赏 最佳答案 递归是一种方法,它不依赖于语言。您在编写算法时要考虑两种情况:再次调用函数的情
这应该是一个简单的问题,但我找不到任何相关信息。给定一个Ruby中的正则表达式,对于每个匹配项,我需要检索匹配的模式$1、$2,但我还需要匹配位置。我知道=~运算符为我提供了第一个匹配项的位置,而string.scan(/regex/)为我提供了所有匹配模式。如果可能,我需要在同一步骤中获得两个结果。 最佳答案 MatchDatastring.scan(regex)do$1#Patternatfirstposition$2#Patternatsecondposition$~.offset(1)#Startingandendingpo
我想开始使用“Sinatra”框架进行编码,但我找不到该框架的“MVC”模式。是“MVC-Sinatra”模式或框架吗? 最佳答案 您可能想查看Padrino这是一个围绕Sinatra构建的框架,可为您的项目提供更“类似Rails”的感觉,但没有那么多隐藏的魔法。这是使用Sinatra可以做什么的一个很好的例子。虽然如果您需要开始使用这很好,但我个人建议您将它用作学习工具,以对您来说最有意义的方式使用Sinatra构建您自己的应用程序。写一些测试/期望,写一些代码,通过测试-重复:)至于ORM,你还应该结帐Sequel其中(imho
有没有一种方法可以自动生成种子数据文件并创建种子数据,就像您在下面链接中的Laravel中看到的那样?LaravelDatabaseMigrations&Seed我在另一个应用程序上看到在Rails的db文件夹下创建了一些带有时间戳的文件,其中包含种子数据。创建它的好方法是什么? 最佳答案 我建议你使用Fabrication的组合gem和Faker.Fabrication允许您编写一个模式来构建您的对象,而Faker为您提供虚假数据,如姓名、电子邮件、电话号码等。这是制造商的样子:Fabricator(:user)dousernam
我有一个交互式RubyonRails应用程序,我想在特定时间将其置于“只读模式”。这将允许用户读取他们需要的数据,但阻止他们执行写入数据库的操作。执行此操作的一种方法是在数据库中放置一个true/false变量,该变量在进行任何写入之前进行检查。我的问题。有没有更优雅的解决方案来解决这个问题? 最佳答案 如果你真的想阻止任何数据库写入,我能想到的最简单的方法是覆盖readonly?始终返回true的模型方法,无论是在选定模型中还是对于所有ActiveRecord模型。如果模型设置为只读(通常通过调用#readonly!来完成),任何