对于基于Python/Django/Celery的部署工具,我们有以下设置:我们目前使用默认的Celery设置。(一个队列+交换称为“celery”。)队列中的每个任务代表一个部署操作。环境的每项任务都以可能需要(非常)长的同步阶段结束。需要满足以下规范:并发性:多个环境的任务应该同时执行。锁定:可能至多每个环境同时运行任务(即环境锁定)。吞吐量优化:当单个环境有多个任务时,可以将它们的同步阶段合并起来进行优化。因此,如果任务接近尾声,它应该检查队列中是否有新任务等待此环境,如果有,则跳过其同步阶段。实现它的首选方法是什么?一些想法:我会说我们必须设置多个队列:每个环境一个,并让N个c
很多天后,我有了一个工作的celery和celery节拍任务列表,结果使用django_celery_results存储。但是,当我查看表记录时,其中没有任何有用的信息。是否可以将任务ID设置为人类可读的内容?一个例子是使用demo任务,它返回no,但是一个不可读的任务id任务.py@app.taskdeftest(a,b):returna+bapp.settings中的调度程序CELERYBEAT_SCHEDULE={'test_task':{'task':'home.tasks.test','schedule':crontab(minute='*/1'),},
如何防止Celery在上一次执行完成之前执行周期性任务?我有一个服务器集群,链接到一个公共(public)数据库服务器,执行Celery任务,我发现每个服务器可能偶尔同时运行相同的任务,以及不同的服务器同时运行相同的任务。这会导致大量竞态条件以极其微妙的方式破坏我的数据。我一直在阅读Celery'sdocs,但我找不到任何明确允许这样做的选项。我找到了一个similarquestion,但建议的修复程序似乎是一个hack,因为它依赖于Django的缓存框架,因此可能不会被集群中的所有服务器共享,从而允许多个服务器同时执行相同的任务。Celery中有没有选项可以记录数据库中当前正在运行的
我的情况与概述的情况类似here,除了不是用多个参数链接任务,我想链接返回包含多个条目的字典的任务。这是——非常松散和抽象的——我正在尝试做的事情:任务.py@task()deftask1(item1=None,item2=None):item3=#dosomestuffwithitem1anditem2toyielditem3return_object=dict(item1=item1,item2=item2,item3=item3)returnreturn_objectdeftask2(item1=None,item2=None,item3=None):item4=#dosomet
我想不通的是,尽管ThreadPoolExecutor使用守护进程,但即使主线程退出,它们仍会运行。我可以在python3.6.4中提供一个最小的例子:importconcurrent.futuresimporttimedeffn():whileTrue:time.sleep(5)print("Hello")thread_pool=concurrent.futures.ThreadPoolExecutor()thread_pool.submit(fn)whileTrue:time.sleep(1)print("Wow")主线程和工作线程都是死循环。因此,如果我使用KeyboardInt
我正在尝试在Django的单元测试框架中测试一些celery功能,但每当我尝试检查AsyncResult时,测试就像从未启动过一样。我知道这段代码在使用RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。这是一个例子:@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,CELERY_ALWAYS_EAGER=True,BROKER_BACKEND='memory',)deftest_celery_do_work(self):result=myapp.tasks.celery_do_work
我刚刚升级到celery3.1,现在我在我的日志中看到了这个::on_node_lost-INFO-missedheartbeatfromcelery@queue_nameforeveryqueue/workerinmycluster.根据文档,BROKER_HEARTBEAT默认关闭,我还没有配置它。我应该明确设置BROKER_HEARTBEAT=0还是应该检查其他内容? 最佳答案 Celery3.1添加了新的mingle和gossip程序。我也得到了很多错过的心跳和传递--without-gossip给我的worker清除了它。
在Celery中,如果出现异常,您可以重试任何任务。你可以这样做:@task(max_retries=5)defdiv(a,b):try:returna/bexceptZeroDivisionError,exc:raisediv.retry(exc=exc)在这种情况下,如果你想除以零,任务将被重新执行五次。但是您必须显式检查代码中的错误。如果您跳过try-exceptblock,任务将不会重新启动。我希望我的函数看起来像:@celery.task(autoretry_on=ZeroDivisionError,max_retries=5)defdiv(a,b):returna/b
我正在用celerybeat运行django+celery,但出现了这个错误.../local/lib/python2.7/site-packages/celery/beat.py",line367,insetup_schedulewriteback=True)File"/usr/lib/python2.7/shelve.py",line239,inopenreturnDbfilenameShelf(filename,flag,protocol,writeback)File"/usr/lib/python2.7/shelve.py",line223,in__init__Shelf.__
我有一个Django1.7项目和Celery3.1。我的Django项目中的所有应用程序都使用新的AppConfig。问题是并不是所有的任务都可以通过autodiscover_tasks找到:app.autodiscover_tasks(settings.INSTALLED_APPS)如果我像这样使用autodiscover_tasks它将起作用:app.autodiscover_tasks(settings.INSTALLED_APPS+('apps.core','apps.sales'))找到网站中定义的任务,但找不到核心和销售中的任务。所有的布局都与apps.py和tasks.p