草庐IT

celery_worker

全部标签

python - 用 celery 运行 "unique"任务

我使用celery来更新我的新闻聚合网站中的RSS提要。我为每个提要使用了一个@task,一切似乎都很好。虽然有一个细节我不确定如何处理好:所有提要每分钟使用@periodic_task更新一次,但是如果启动新任务时提要仍在从上一个定期任务更新怎么办?(例如,如果提要真的很慢,或者离线并且任务处于重试循环中)目前我存储任务结果并检查它们的状态,如下所示:importsocketfromdatetimeimporttimedeltafromcelery.decoratorsimporttask,periodic_taskfromaggregator.modelsimportFeed_re

python - 如何让 django celery 写入测试数据库以进行功能测试?

我正在开发一个Django应用程序。我们正在使用celery对Mongo数据库的写入进行排队。我正在尝试为一个在celery中排队的函数编写一个功能测试(使用Selenium)。问题是celery写入主Mongo数据库而不是测试数据库。如何设置我的功能测试以使用写入测试数据库的celery实例?我们使用'django_nose.NoseTestSuiteRunner'作为我们的TEST_RUNNER。更新:我无法弄清楚如何使用另一个celery实例进行测试,但我找到了一种绕过celery进行功能测试的方法。在我的settings.py中:FUNC_TEST_COMMAND=['func

python - 如何让 django celery 写入测试数据库以进行功能测试?

我正在开发一个Django应用程序。我们正在使用celery对Mongo数据库的写入进行排队。我正在尝试为一个在celery中排队的函数编写一个功能测试(使用Selenium)。问题是celery写入主Mongo数据库而不是测试数据库。如何设置我的功能测试以使用写入测试数据库的celery实例?我们使用'django_nose.NoseTestSuiteRunner'作为我们的TEST_RUNNER。更新:我无法弄清楚如何使用另一个celery实例进行测试,但我找到了一种绕过celery进行功能测试的方法。在我的settings.py中:FUNC_TEST_COMMAND=['func

python - 类型错误 : worker() takes 0 positional arguments but 1 was given

这个问题在这里已经有了答案:TypeError:method()takes1positionalargumentbut2weregiven(11个回答)关闭5个月前。我正在尝试实现一个子类,但它会抛出错误:TypeError:worker()接受0个位置参数,但给出了1个classKeyStatisticCollection(DataDownloadUtilities.DataDownloadCollection):defGenerateAddressStrings(self):passdefworker():passdefDownloadProc(self):pass

python - Celery AttributeError : async error

我在我的Mac(OS/X10.13.4)上本地运行RabbitMQ和Celery,当我运行add.delay(x,y)时,以下代码在本地运行:#!/usr/bin/envpythonfromceleryimportCeleryfromcelery.utils.logimportget_task_loggerlogger=get_task_logger(__name__)app=Celery('tasks',\broker='pyamqp://appuser:xx@c2/appvhost',\backend='db+mysql://appuser:xx@c2/pigpen')@app.t

python - 具有多处理功能的 Celery 并行分布式任务

我有一个CPU密集型Celery任务。我想使用跨大量EC2实例的所有处理能力(核心)来更快地完成这项工作(具有多处理功能的celery并行分布式任务-我认为)。线程、多处理、分布式计算、分布式并行处理这些术语都是我的术语试图更好地理解。示例任务:@app.taskforiteminlist_of_millions_of_ids:id=item#dosomelongcomplicatedequationhereveryCPUheavy!!!!!!!database.objects(newid=id).save()使用上面的代码(如果可能的话,还有一个例子)之前人们会如何使用Celery分

python - 使用指数回退重试 Celery 任务

对于这样的任务:fromcelery.decoratorsimporttask@task()defadd(x,y):ifnotxornoty:raiseException("testerror")returnself.wait_until_server_responds(如果它抛出异常并且我想从守护进程重试它,如何应用指数退避算法,即在2^2,2^3,2^4等秒之后?也是从服务器端维护重试,这样如果worker碰巧被杀死,那么下一个产生的worker将接受重试任务? 最佳答案 task.request.retries属性包含到目前为

python - 如何手动从 shell 运行 celery 定期任务?

我正在使用celery和django-celery。我已经定义了一个我想测试的定期任务。是否可以手动从shell运行定期任务以便查看控制台输出? 最佳答案 您是否尝试过仅从Djangoshell运行任务?您可以使用任务的.apply方法来确保它在本地快速运行。假设任务在tasks子模块中的Django应用myapp中称为my_task:$pythonmanage.pyshell>>>frommyapp.tasksimportmy_task>>>eager_result=my_task.apply()结果实例与通常的AsyncResu

python - 分布式任务队列(例如 Celery)与 crontab 脚本

我无法理解“分布式任务队列”的用途。比如python的celerylibrary.我知道在celery(python框架)中,您可以为函数设置定时窗口来执行。但是,这也可以在针对python脚本的linuxcrontab中轻松完成。据我所知,从我自己的django-celerywebapps中可以看出,celery比仅仅设置一个原始的crontab消耗更多的RAM内存。对于一个相对较小的应用程序来说只有几百MB的差异。有人可以帮我解决这个问题吗?或许对任务队列/crontab的一般工作方式进行高级解释也会很好。谢谢。 最佳答案 这取

python - 用 Celery 取消已经执行的任务?

我一直在阅读文档并进行搜索,但似乎找不到直接的答案:你能取消一个已经在执行的任务吗?(因为任务已经开始,需要一段时间,中途需要取消)我是从位于CeleryFAQ的文档中找到的>>>result=add.apply_async(args=[2,2],countdown=120)>>>result.revoke()但我不清楚这是否会取消排队的任务,或者是否会杀死工作人员上正在运行的进程。感谢您提供的任何启发! 最佳答案 revoke取消任务执行。如果任务被撤销,工作人员会忽略该任务并且不执行它。如果您不使用持久撤销,您的任务可以在工作人