草庐IT

celery_worker

全部标签

python - 在 Celery 中,我怎样才能防止长期延迟的任务阻塞更新的任务?

我有两种任务。任务A由celerybeat每小时生成一次。它会立即运行,并生成一千(或几千)个任务B实例,每个实例的ETA都是future一天。启动时,任务A的一个实例运行并生成一千个B。从那时起,什么也没有发生。我应该看到另一个A每小时运行一次,还有一千个B。但实际上我什么也没看到。在卡住时,rabbitmqctl显示1000条消息,其中968条准备就绪,32条未确认。一小时后,有1001条消息,其中969条已准备就绪,32条未确认。依此类推,每小时一条新消息被归类为就绪。据推测,正在发生的事情是工作人员正在预取32条消息,但无法对它们采取行动,因为它们的ETA仍在未来。与此同时,本

python - 测试 celery 任务是否仍在处理中

如何测试任务(task_id)是否仍在celery中处理?我有以下场景:在DjangoView中启动任务在session中存储BaseAsyncResult关闭celery守护进程(硬)以便不再处理任务检查任务是否“已死”有什么想法吗?能否查找celery正在处理的所有任务并检查我的任务是否仍然存在? 最佳答案 在你的模型中定义一个字段(PickledObjectField)来存储celery任务:classYourModel(models.Model):..celery_task=PickledObjectField()..def

python - 为什么 celery 没有干净地关闭?

在使用Ctlr-C键盘中断向Celeryworker发出两次TERM信号(热关机和冷关机)后,Celeryworker就被挂断了。它没有使用消息或执行任务(如预期的那样),但也没有关闭。我在Celery进程上运行strace以查看幕后发生的事情。这是Celery主进程的PID上的strace输出strace-p27867Process27867attached-interrupttoquitfutex(0xb966a78,FUTEX_WAIT,0,NULL这是我在子进程上执行strace时发现的:strace-p27874Process27874attached-interruptto

php - 如何在 PHP 的 celery-rabbitmq 队列上发布任务?

我安装了以下版本的celery和rabbitmq-celery3.1.6rabbitmq3.1.1我可以从PHP将任务发布到默认队列-//client.phpPostTask('tasks.add',array(2,2));我的worker模块在python中-#tasks.pyfromceleryimportCelerycelery=Celery('tasks',broker='amqp://guest:guest@localhost:5672//')@celery.task(queue='demo',name='add')defadd(x,y):returnx+y我像这样运行cel

python - Celery:如何将失败的任务路由到死信队列

我是celery的新手,我尝试将这个任务队列集成到我的项目中,但我仍然不明白celery如何处理失败的任务,我想将所有这些保留在amqp死信中排队。根据文档here似乎在启用了acks_late的任务中引发Reject会产生与确认消息相同的效果,然后我们再谈谈死信队列。所以我在我的celery配置中添加了一个自定义默认队列celery_app.conf.update(CELERY_ACCEPT_CONTENT=['application/json'],CELERY_TASK_SERIALIZER='json',CELERY_QUEUES=[CELERY_QUEUE,CELERY_DLX

python - 使用 celery 处理巨大的文本文件

背景我正在研究使用celery(3.1.8)来处理每个巨大的文本文件(~30GB)。这些文件在fastq中格式并包含大约118M测序“读数”,它们基本上都是header、DNA序列和质量字符串的组合)。此外,这些序列来自双端测序运行,因此我同时迭代两个文件(通过itertools.izip)。我希望能够做的是获取每对读取,将它们发送到队列,并让它们在我们集群中的一台机器上进行处理(不管是哪台机器)以返回清理后的版本如果需要进行清洁(例如,基于质量),则读取。我已经设置了celery和rabbitmq,我的workers启动如下:celeryworker-Atasks--autorelo

python - 去抖 celery 任务?

是否有用于消除Celery任务的标准方法?例如,一个任务可以多次“启动”,但在一些延迟后只会运行一次:defdebounce_task(task):iftask_is_queued(task):returntask.apply_async(countdown=30) 最佳答案 这是我们使用Redis计数器的方法。所有这些可能都可以在装饰器中概括,但我们只将它用于特定任务(webhooks)您的面向公众的任务是您从其他职能部门调用的任务。它需要在Redis中增加一个键。键由函数的参数组成,无论它们是什么(这确保计数器在各个任务中是唯一

Python:确保 os.environ 和 sys.path 相等:web-requests、shell、cron、celery

我想确保os.environ和sys.path对于我们启动Python解释器的所有方式都是相同的:通过Django和Apachemod_wsgi的网络请求计划任务通过ssh交互式登录celery作业通过systemd启Action业有解决这个问题的通用方法吗?如果是,那就太好了:它看起来怎么样?如果没有,很伤心:每个人都会自己解决这个问题。...解决这个问题的好方法是什么?操作系统:Linux(支持systemd)更新更明确:我希望sys.path在网络请求、cron作业、从shell启动的python中相同,...我希望os.environ在Web请求、cron作业、从shell启动

worker 中的 Python 多处理和处理异常

我将python多处理库用于一种算法,其中我有许多工作人员处理某些数据并将结果返回给父进程。我使用multiprocessing.Queue将作业传递给工作人员,然后收集结果。一切都很好,直到worker无法处理某些数据block。在下面的简化示例中,每个工作人员都有两个阶段:初始化-可能会失败,在这种情况下worker应该被销毁数据处理——处理一block数据可能会失败,在这种情况下,worker应该跳过这个block并继续处理下一个数据。当这两个阶段中的任何一个失败时,我都会在脚本完成后陷入僵局。此代码模拟了我的问题:importmultiprocessingasmpimportr

python - celery :不允许守护进程有 child

在Python(2.7)中,我尝试在celery任务(celery3.1.17)中创建进程(使用多处理),但它给出了错误:daemonicprocessesarenotallowedtohavechildren谷歌搜索,我发现最新版本的billiard修复了“错误”,但我有最新版本(3.3.0.20),但错误仍在发生。我还尝试实现thisworkaround在我的celery任务中,但它给出了同样的错误。有人知道怎么做吗?任何帮助表示赞赏,帕特里克编辑:代码片段任务:from__future__importabsolute_importfromceleryimportshared_ta