草庐IT

celery_tasks

全部标签

python - 查看 celery 任务是否存在

是否有可能找出具有特定任务id的任务是否存在?当我尝试获取状态时,我总是处于待处理状态。>>>AsyncResult('...').status'PENDING'我想知道给定的任务id是否是真正的celery任务id而不是随机字符串。我想要不同的结果,具体取决于某个id是否存在有效任务。过去可能存在具有相同id的有效任务,但结果可能已从后端删除。 最佳答案 Celery在发送任务时不会写入状态,这部分是一种优化(参见documentation)。如果你真的需要,添加很简单:fromceleryimportcurrent_app#`a

python - 如何在 celery 中禁止泡菜序列化

Celery默认使用pickle作为任务的序列化方法。如FAQ中所述,这代表了一个安全漏洞。Celery允许您使用CELERY_TASK_SERIALIZER配置参数配置任务如何序列化。但这并不能解决安全问题。即使任务使用JSON或类似的方式进行序列化,worker仍将使用pickle序列化执行插入到队列中的任务——它们只是响应消息中的content-type参数。因此,任何可以写入任务队列的人都可以通过编写恶意腌制对象来有效地控制工作进程。如何防止工作线程运行用pickle序列化的任务? 最佳答案 我收到“ContentDisal

python - 检测 Celery 是否可用/正在运行

我正在使用Celery管理异步任务。然而,偶尔,celery进程会停止,这会导致没有任何任务被执行。我希望能够检查celery的状态并确保一切正常,如果我检测到任何问题,则会向用户显示错误消息。从CeleryWorker文档看来,我可以使用ping或inspect为此,但是ping感觉很笨拙,并且不清楚究竟是如何使用inspect的(如果inspect().registered()是空的?)。对此的任何指导将不胜感激。基本上我正在寻找的是这样的方法:defcelery_is_alive():fromcelery.task.controlimportinspectreturnbool(i

python - 用 celery 运行 "unique"任务

我使用celery来更新我的新闻聚合网站中的RSS提要。我为每个提要使用了一个@task,一切似乎都很好。虽然有一个细节我不确定如何处理好:所有提要每分钟使用@periodic_task更新一次,但是如果启动新任务时提要仍在从上一个定期任务更新怎么办?(例如,如果提要真的很慢,或者离线并且任务处于重试循环中)目前我存储任务结果并检查它们的状态,如下所示:importsocketfromdatetimeimporttimedeltafromcelery.decoratorsimporttask,periodic_taskfromaggregator.modelsimportFeed_re

python - 如何让 django celery 写入测试数据库以进行功能测试?

我正在开发一个Django应用程序。我们正在使用celery对Mongo数据库的写入进行排队。我正在尝试为一个在celery中排队的函数编写一个功能测试(使用Selenium)。问题是celery写入主Mongo数据库而不是测试数据库。如何设置我的功能测试以使用写入测试数据库的celery实例?我们使用'django_nose.NoseTestSuiteRunner'作为我们的TEST_RUNNER。更新:我无法弄清楚如何使用另一个celery实例进行测试,但我找到了一种绕过celery进行功能测试的方法。在我的settings.py中:FUNC_TEST_COMMAND=['func

python - 如何让 django celery 写入测试数据库以进行功能测试?

我正在开发一个Django应用程序。我们正在使用celery对Mongo数据库的写入进行排队。我正在尝试为一个在celery中排队的函数编写一个功能测试(使用Selenium)。问题是celery写入主Mongo数据库而不是测试数据库。如何设置我的功能测试以使用写入测试数据库的celery实例?我们使用'django_nose.NoseTestSuiteRunner'作为我们的TEST_RUNNER。更新:我无法弄清楚如何使用另一个celery实例进行测试,但我找到了一种绕过celery进行功能测试的方法。在我的settings.py中:FUNC_TEST_COMMAND=['func

python - Celery AttributeError : async error

我在我的Mac(OS/X10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:#!/usr/bin/envpythonfromceleryimportCeleryfromcelery.utils.logimportget_task_loggerlogger=get_task_logger(__name__)app=Celery('tasks',\broker='pyamqp://appuser:xx@c2/appvhost',\backend='db+mysql://appuser:xx@c2/pigpen')@app.t

python - 具有多处理功能的 Celery 并行分布式任务

我有一个CPU密集型Celery任务。我想使用跨大量EC2实例的所有处理能力(核心)来更快地完成这项工作(具有多处理功能的celery并行分布式任务-我认为)。线程、多处理、分布式计算、分布式并行处理这些术语都是我的术语试图更好地理解。示例任务:@app.taskforiteminlist_of_millions_of_ids:id=item#dosomelongcomplicatedequationhereveryCPUheavy!!!!!!!database.objects(newid=id).save()使用上面的代码(如果可能的话,还有一个例子)之前人们会如何使用Celery分

python - 使用指数回退重试 Celery 任务

对于这样的任务:fromcelery.decoratorsimporttask@task()defadd(x,y):ifnotxornoty:raiseException("testerror")returnself.wait_until_server_responds(如果它抛出异常并且我想从守护进程重试它,如何应用指数退避算法,即在2^2,2^3,2^4等秒之后?也是从服务器端维护重试,这样如果worker碰巧被杀死,那么下一个产生的worker将接受重试任务? 最佳答案 task.request.retries属性包含到目前为

python - 如何手动从 shell 运行 celery 定期任务?

我正在使用celery和django-celery。我已经定义了一个我想测试的定期任务。是否可以手动从shell运行定期任务以便查看控制台输出? 最佳答案 您是否尝试过仅从Djangoshell运行任务?您可以使用任务的.apply方法来确保它在本地快速运行。假设任务在tasks子模块中的Django应用myapp中称为my_task:$pythonmanage.pyshell>>>frommyapp.tasksimportmy_task>>>eager_result=my_task.apply()结果实例与通常的AsyncResu