草庐IT

go - 处理来自 RabbitMQ 的消息时限制并发

coder 2023-07-02 原文

我正在尝试从队列 (RabbitMQ) 中读取 URL 并发出有限数量的并发 HTTP 请求,即有一个 10 人的工作池对从队列中接收到的 URL 发出并发请求(永远)。

到目前为止,我已经按照 RabbitMQ 教程实现了一个消费者: https://www.rabbitmq.com/tutorials/tutorial-one-go.html

并尝试了网络上发现的示例中的许多方法,以这里的示例结尾: http://jmoiron.net/blog/limiting-concurrency-in-go/

不幸的是,我当前的代码运行大约一分钟,然后无限期地卡住。我试过添加/移动 Go 例程,但我似乎无法让它按预期工作(我是 Go 的新手)。

当前代码:

package main

import (
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/Xide/bloom"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

var netClient = &http.Client{
    Timeout: time.Second * 10,
}

func getRequest(url string) {
    //resp, err := http.Get(string(url))
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

func main() {
    bf := bloom.NewDefaultScalable(0.1)

    conn, err := amqp.Dial("amqp://127.0.0.1:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "urls",            // name
        true,              // durable
        false,             // delete when unused
        false,             // exclusive
        false,             // no-wait
        nil,               // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, //global
    )
    failOnError(err, "Failed to set Qos")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    concurrency := 10
    sem := make(chan bool, concurrency)
    go func() {
        for d := range msgs {
            sem <- true
            url := string(d.Body)
            if bf.Match(url) == false {
                bf.Feed(url)
                log.Printf("Not seen: %s", d.Body)
                go func(url string) {
                    defer func() { <-sem }()
                    getRequest(url)
                }(url)
            } else {
                log.Printf("Already seen: %s", d.Body)
            }
            d.Ack(false)
        }
        for i := 0; i < cap(sem); i++ {
            sem <- true
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

最佳答案

您没有正确处理您的 HTTP 响应,导致打开的连接越来越多。试试这个:

func getRequest(url string) {
    resp, err := netClient.Get(string(url))
    if err != nil {
        log.Printf("HTTP request error: %s", err)
        return
    }
    // Add this bit:
    defer func() {
        io.Copy(ioutil.Discard, resp.Body)
        resp.Body.Close()
    }()
    fmt.Println("StatusCode:", resp.StatusCode)
    fmt.Println(resp.Request.URL)
}

在您阅读完来自 channel 的消息后,这似乎是不必要的并且可能存在问题:

    for i := 0; i < cap(sem); i++ {
        sem <- true
    }

为什么在读取队列中的所有消息后填充 sem channel ?您已经向 channel 添加了与您希望从中读取的消息一样多的消息,所以这充其量是毫无意义的,如果您对其余代码进行了错误的更改,可能会导致问题。

与您的问题无关,但这是多余的:

if err != nil {
    log.Fatalf("%s: %s", msg, err)
    panic(fmt.Sprintf("%s: %s", msg, err))
}

根据 the documentation , Fatalf 已经退出,所以 panic 永远不会被调用。如果你想登录并 panic,试试 log.Panicf ,专为此目的而设计。

关于go - 处理来自 RabbitMQ 的消息时限制并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45858684/

有关go - 处理来自 RabbitMQ 的消息时限制并发的更多相关文章

  1. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  2. ruby-on-rails - 如何在 Rails View 上显示错误消息? - 2

    我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c

  3. ruby - 使用 Ruby 通过 Outlook 发送消息的最简单方法是什么? - 2

    我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=

  4. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  5. ruby-on-rails - 在 Flash 警报 Rails 3 中显示错误消息 - 2

    如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]

  6. ruby-on-rails - 在 RSpec 中,如何以任意顺序期望具有不同参数的多条消息? - 2

    RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)

  7. Ruby-vips 图像处理库。有什么好的使用示例吗? - 2

    我对图像处理完全陌生。我对JPEG内部是什么以及它是如何工作一无所知。我想知道,是否可以在某处找到执行以下简单操作的ruby​​代码:打开jpeg文件。遍历每个像素并将其颜色设置为fx绿色。将结果写入另一个文件。我对如何使用ruby​​-vips库实现这一点特别感兴趣https://github.com/ender672/ruby-vips我的目标-学习如何使用ruby​​-vips执行基本的图像处理操作(Gamma校正、亮度、色调……)任何指向比“helloworld”更复杂的工作示例的链接——比如ruby​​-vips的github页面上的链接,我们将不胜感激!如果有ruby​​-

  8. ruby - 可以正常中断的来自 Rake 的长时间运行的 shell 命令? - 2

    在几个项目中,我希望有一个类似rakeserver的rake任务,它将通过任何需要的方式开始为该应用程序提供服务。这是一个示例:task:serverdo%x{bundleexecrackup-p1234}end这行得通,但是当我准备停止它时,按Ctrl+c并没有正常关闭;它中断了Rake任务本身,它说rakeaborted!并给出堆栈跟踪。在某些情况下,我必须执行Ctrl+c两次。我可能可以用Signal.trap写一些东西来更优雅地中断它。有没有更简单的方法? 最佳答案 trap('SIGINT'){puts"Yourmessa

  9. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  10. ruby - 如何使用 Ruby HTTP::Net 处理 404 错误? - 2

    我正在尝试解析网页,但有时会收到404错误。这是我用来获取网页的代码:result=Net::HTTP::getURI.parse(URI.escape(url))如何测试result是否为404错误代码? 最佳答案 像这样重写你的代码:uri=URI.parse(url)result=Net::HTTP.start(uri.host,uri.port){|http|http.get(uri.path)}putsresult.codeputsresult.body这将打印状态码和正文。

随机推荐