我正在尝试使用django和rabbit-mq设置celeryd。到目前为止,我已经完成了以下工作:从pip安装celery通过其存储库中可用的deb安装了rabbitmq通过rabbitmqctl向rabbitmq添加了一个用户和虚拟主机,以及该用户的权限启动rabbitmq-server通过pip安装了django-celery设置django-celery,包括它的表在settings.py中配置各种内容(BROKER_HOST、BROKER_PORT、BROKER_USER、BROKER_PASSWORD、BROKER_VHOST,以及导入djecelery、调用设置函数并将其
当我将任务路由到特定队列时,它会起作用:task.apply_async(queue='beetroot')但是如果我创建一个链:chain=task|task然后我写chain.apply_async(queue='beetroot')它似乎忽略了queue关键字并分配给默认的'celery'队列。如果celery支持链式路由就好了——所有任务都在同一个队列中按顺序执行。 最佳答案 我是这样做的:subtask=task.s(*myargs,**mykwargs).set(queue=myqueue)mychain=celery.
Celery包含一个模块,该模块能够使用amqp或其他一些celery后端发出异步HTTP请求。我正在使用tornado-celery异步消息发布的生产者。据我了解tornado-celery为此使用鼠兔。问题是如何为Tornado调整celery.task.http.URL(使其成为非阻塞)。基本上有两个地方需要细化:HttpDispatch.make_request()必须使用tornado异步http客户端实现;URL.get_async(**kw)或URL.post_async(**kw)必须使用tornadoAPI以相应的非阻塞代码重新实现。例如:classNonBlocki
我正在使用Celery和RabbitMQ来处理来自API请求的数据。流程如下:请求>API>RabbitMQ>CeleryWorker>返回理想情况下,我会产生更多的celeryworker,但我受限于内存限制。目前,我的流程中的瓶颈是从传递给工作人员的URL中获取和下载数据。粗略的,流程大概是这样的:defcelery_gets_job(url):data=fetches_url(url)#takes0.1sto1.0s(bottleneck)result=processes_data(data)#takes0.1sreturnresult这是NotAcceptable,因为工作人员
我想利用Celery(使用RabbitMQ作为后端MQ)通过不同的队列执行不同风格的任务。一个要求是来自特定队列的(由工作人员)消费应该具有暂停和恢复的能力。celery,好像有thiscapability通过调用add_consumer和cancel_consumer。虽然我能够取消特定工作人员队列中任务的消费,但我无法通过调用add_consumer让工作人员恢复消费。Thecodetoreproducethisissueisprovidedhere.我的猜测可能是我缺少某种在celeryconfig中或在启动worker时通过参数提供的参数?如果能对此有一些新的看法,那就太好了。
很多天后,我有了一个工作的celery和celery节拍任务列表,结果使用django_celery_results存储。但是,当我查看表记录时,其中没有任何有用的信息。是否可以将任务ID设置为人类可读的内容?一个例子是使用demo任务,它返回no,但是一个不可读的任务id任务.py@app.taskdeftest(a,b):returna+bapp.settings中的调度程序CELERYBEAT_SCHEDULE={'test_task':{'task':'home.tasks.test','schedule':crontab(minute='*/1'),},
如何防止Celery在上一次执行完成之前执行周期性任务?我有一个服务器集群,链接到一个公共(public)数据库服务器,执行Celery任务,我发现每个服务器可能偶尔同时运行相同的任务,以及不同的服务器同时运行相同的任务。这会导致大量竞态条件以极其微妙的方式破坏我的数据。我一直在阅读Celery'sdocs,但我找不到任何明确允许这样做的选项。我找到了一个similarquestion,但建议的修复程序似乎是一个hack,因为它依赖于Django的缓存框架,因此可能不会被集群中的所有服务器共享,从而允许多个服务器同时执行相同的任务。Celery中有没有选项可以记录数据库中当前正在运行的
我的情况与概述的情况类似here,除了不是用多个参数链接任务,我想链接返回包含多个条目的字典的任务。这是——非常松散和抽象的——我正在尝试做的事情:任务.py@task()deftask1(item1=None,item2=None):item3=#dosomestuffwithitem1anditem2toyielditem3return_object=dict(item1=item1,item2=item2,item3=item3)returnreturn_objectdeftask2(item1=None,item2=None,item3=None):item4=#dosomet
我正在尝试在Django的单元测试框架中测试一些celery功能,但每当我尝试检查AsyncResult时,测试就像从未启动过一样。我知道这段代码在使用RabbitMQ的真实环境中工作,所以我只是想知道为什么在使用测试框架时它不起作用。这是一个例子:@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,CELERY_ALWAYS_EAGER=True,BROKER_BACKEND='memory',)deftest_celery_do_work(self):result=myapp.tasks.celery_do_work
我刚刚升级到celery3.1,现在我在我的日志中看到了这个::on_node_lost-INFO-missedheartbeatfromcelery@queue_nameforeveryqueue/workerinmycluster.根据文档,BROKER_HEARTBEAT默认关闭,我还没有配置它。我应该明确设置BROKER_HEARTBEAT=0还是应该检查其他内容? 最佳答案 Celery3.1添加了新的mingle和gossip程序。我也得到了很多错过的心跳和传递--without-gossip给我的worker清除了它。