我希望celery任务能够获取执行它的工作人员的姓名,以便进行日志记录。我需要在任务中处理这个问题,而不是直接查询代理。有没有办法做到这一点?如果重要的话,我正在将celery与RabbitMQ一起使用。 最佳答案 使用celeryd_after_setup信号像这样捕获worker名称:fromcelery.signalsimportceleryd_after_setup@celeryd_after_setup.connectdefcapture_worker_name(sender,instance,**kwargs):os.e
我想设置一个网络发布/订阅事件系统,但还需要能够异步运行任务。我曾尝试让celery来完成繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。我有两台机器(输入和输出),它们都可以访问RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了input_machine启动main.py,它启动了一个celery任务,该任务由input_machine上的工作人员监控到“输入”队列。这个任务只运行whileTrue循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room',什么都不做)celery任务到“输出”队
假设我有两个celery任务:@celery.taskdefrun_flakey_things(*args,**kwargs):returnrun_flakey_and_synchronous_thing.map(xrange(10)).apply_async()@celery.taskdefrun_flakey_and_synchronous_thing(a):ifa%5:returnaraiseRuntimeError(a)因此,当您运行run_flakey_things时,它会立即失败,因为序列中的第一项会引发异常。我想要的是像map一样按顺序为序列中的所有项目运行任务,但继续运
我有两种任务。任务A由celerybeat每小时生成一次。它会立即运行,并生成一千(或几千)个任务B实例,每个实例的ETA都是future一天。启动时,任务A的一个实例运行并生成一千个B。从那时起,什么也没有发生。我应该看到另一个A每小时运行一次,还有一千个B。但实际上我什么也没看到。在卡住时,rabbitmqctl显示1000条消息,其中968条准备就绪,32条未确认。一小时后,有1001条消息,其中969条已准备就绪,32条未确认。依此类推,每小时一条新消息被归类为就绪。据推测,正在发生的事情是工作人员正在预取32条消息,但无法对它们采取行动,因为它们的ETA仍在未来。与此同时,本
如何测试任务(task_id)是否仍在celery中处理?我有以下场景:在DjangoView中启动任务在session中存储BaseAsyncResult关闭celery守护进程(硬)以便不再处理任务检查任务是否“已死”有什么想法吗?能否查找celery正在处理的所有任务并检查我的任务是否仍然存在? 最佳答案 在你的模型中定义一个字段(PickledObjectField)来存储celery任务:classYourModel(models.Model):..celery_task=PickledObjectField()..def
在使用Ctlr-C键盘中断向Celeryworker发出两次TERM信号(热关机和冷关机)后,Celeryworker就被挂断了。它没有使用消息或执行任务(如预期的那样),但也没有关闭。我在Celery进程上运行strace以查看幕后发生的事情。这是Celery主进程的PID上的strace输出strace-p27867Process27867attached-interrupttoquitfutex(0xb966a78,FUTEX_WAIT,0,NULL这是我在子进程上执行strace时发现的:strace-p27874Process27874attached-interruptto
我安装了以下版本的celery和rabbitmq-celery3.1.6rabbitmq3.1.1我可以从PHP将任务发布到默认队列-//client.phpPostTask('tasks.add',array(2,2));我的worker模块在python中-#tasks.pyfromceleryimportCelerycelery=Celery('tasks',broker='amqp://guest:guest@localhost:5672//')@celery.task(queue='demo',name='add')defadd(x,y):returnx+y我像这样运行cel
我是celery的新手,我尝试将这个任务队列集成到我的项目中,但我仍然不明白celery如何处理失败的任务,我想将所有这些保留在amqp死信中排队。根据文档here似乎在启用了acks_late的任务中引发Reject会产生与确认消息相同的效果,然后我们再谈谈死信队列。所以我在我的celery配置中添加了一个自定义默认队列celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],CELERY_TASK_SERIALIZER='json',CELERY_QUEUES=[CELERY_QUEUE,CELERY_DLX
背景我正在研究使用celery(3.1.8)来处理每个巨大的文本文件(~30GB)。这些文件在fastq中格式并包含大约118M测序“读数”,它们基本上都是header、DNA序列和质量字符串的组合)。此外,这些序列来自双端测序运行,因此我同时迭代两个文件(通过itertools.izip)。我希望能够做的是获取每对读取,将它们发送到队列,并让它们在我们集群中的一台机器上进行处理(不管是哪台机器)以返回清理后的版本如果需要进行清洁(例如,基于质量),则读取。我已经设置了celery和rabbitmq,我的workers启动如下:celeryworker-Atasks--autorelo
是否有用于消除Celery任务的标准方法?例如,一个任务可以多次“启动”,但在一些延迟后只会运行一次:defdebounce_task(task):iftask_is_queued(task):returntask.apply_async(countdown=30) 最佳答案 这是我们使用Redis计数器的方法。所有这些可能都可以在装饰器中概括,但我们只将它用于特定任务(webhooks)您的面向公众的任务是您从其他职能部门调用的任务。它需要在Redis中增加一个键。键由函数的参数组成,无论它们是什么(这确保计数器在各个任务中是唯一