我正在使用 celery(并发池 = 1),我希望能够在特定任务运行后关闭工作程序。需要注意的是,我想避免 worker 在那之后再接手任何其他任务的可能性。
这是我在大纲中的尝试:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
但是,当我运行 worker 时
celery -A celeryapp worker --concurrency=1 --pool=solo
并运行任务
add.delay(1,4)
我得到以下信息:
-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
任务重新排队,将在另一个工作人员上再次运行,导致循环。
当我在任务本身中移动 WorkerShutdown 异常时也会发生这种情况。
@app.task
def add(x, y):
print(x + y)
raise WorkerShutdown()
有没有一种方法可以在完成特定任务后关闭工作器,同时避免这种不幸的副作用?
最佳答案
关闭工作器的推荐过程是发送 TERM 信号。这将导致 celery worker 在完成任何当前正在运行的任务后关闭。如果你向 worker 的主进程发送 QUIT 信号,worker 将立即关闭。
然而,celery 文档通常讨论从命令行或通过 systemd/initd 管理 celery,但 celery 还通过 celery.app.control 提供远程工作控制 API。
您可以revoke阻止 worker 执行任务的任务。这应该可以防止您遇到的循环。此外,控制支持shutdown worker 也是这样。
所以我想下面的内容会让你得到你想要的行为。
@app.task(bind=True)
def shutdown(self):
app.control.revoke(self.id) # prevent this task from being executed again
app.control.shutdown() # send shutdown signal to all workers
由于目前无法从任务中确认任务,然后继续执行所述任务,这种使用 revoke 的方法规避了这个问题,即使任务再次排队,新 worker 会直接忽略它。
或者,以下内容也会阻止重新交付的任务再次执行...
@app.task(bind=True)
def some_task(self):
if self.request.delivery_info['redelivered']:
raise Ignore() # ignore if this task was redelivered
print('This should only execute on first receipt of task')
另外值得注意的是AsyncResult还有一个 revoke 方法为你调用 self.app.control.revoke。
关于python - 特定任务后 celery 关闭 worker ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49348851/
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11
这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的
本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决
在读取/解析文件(使用Ruby)时忽略某些行的最佳方法是什么?我正在尝试仅解析Cucumber.feature文件中的场景,并希望跳过不以Scenario/Given/When/Then/And/But开头的行。下面的代码有效,但它很荒谬,所以我正在寻找一个聪明的解决方案:)File.open(file).each_linedo|line|line.chomp!nextifline.empty?nextifline.include?"#"nextifline.include?"Feature"nextifline.include?"Inorder"nextifline.include?