草庐IT

python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接

coder 2023-08-16 原文

我正在使用 RabbitMQ 生产者将长时间运行的任务(30 分钟以上)发送给消费者。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。

通过研究我了解到 heartbeatincreased connection timeout可以用来解决这个问题。这两种解决方案在尝试时都会引发错误。在阅读类似帖子的答案时,我还了解到自发布答案以来,RabbitMQ 已经实现了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。

指定心跳和阻塞连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

在连接参数中指定 heartbeat_interval=1000 时会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

同样对于 socket_timeout = 1000 会显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我在 Ubuntu 14.04 上运行 RabbitMQ 3.6.1、pika 0.10.0 和 python 2.7。

  1. 为什么上述方法会产生错误?
  2. 可以在有长时间运行的连续任务的情况下使用心跳方法吗?例如,在执行需要 30 分钟以上的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。

我已经通读了类似问题的答案

更新:正在运行code from the pika documentation产生相同的错误。

最佳答案

我的系统遇到了与您看到的相同的问题,即在执行非常长的任务时连接中断。

如果您的网络设置强制断开空闲 TCP/IP 连接,心跳可能有助于保持连接有效。但是,如果不是这种情况,则更改心跳也无济于事。

更改连接超时根本无济于事。此设置仅在最初创建连接时使用。

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

这有两个原因,您已经遇到了这两个原因:

  1. 连接随机断开,即使在最好的情况下也是如此
  2. 由于重新排队的消息而重新启动进程可能会导致问题

部署 RabbitMQ 代码的任务时间范围从不到一秒到几个小时不等,我发现立即确认消息并使用状态消息更新系统最适合非常长的任务,例如这样。

您需要有一个记录系统(可能带有数据库)来跟踪给定作业的状态。

当消费者选择消息并启动流程时,它应该立即确认消息并将“已启动”状态消息发送到记录系统。

当流程完成时,发送另一条消息表示它已完成。

这不会解决连接断开的问题,但无论如何都无法 100% 解决该问题。相反,它将防止在连接断开时发生消息重新排队问题。

不过,此解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,您如何恢复工作?

基本的答案是使用工作的记录系统(您的数据库)状态来告诉您需要重新开始该工作。当应用程序启动时,检查数据库以查看是否有未完成的工作。如果有,以任何适当的方式恢复或重新启动该工作。

关于python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36123006/

有关python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  3. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  4. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  5. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

  6. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  7. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  8. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  9. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  10. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

随机推荐