我正在使用celery组启动一组celery任务,如officialdocumentation中所述我还将组(任务集)ID存储到数据库中,以便轮询celery的任务集状态。job=group([single_test.s(1,1),single_test.s(1,2),single_test.s(1,3),])result=job.apply_async()test_set=MyTestSet()test_set.taskset_id=result.id#storetest_setintoDB有没有办法从任务集ID开始获取GroupResult对象(即我的result)?类似于this
我正在使用Celery4.0.1和Django1.10并且我在安排任务时遇到了问题(运行任务正常)。这是celery配置:os.environ.setdefault('DJANGO_SETTINGS_MODULE','myapp.settings')app=Celery('myapp')app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)app.conf.BROKER_URL='amqp://{}:{}@{}'.format(settings.AMQP_USER,settings.AMQP_PASSWORD,settings.AM
我需要从models.py调用celery任务(在tasks.py中),唯一的问题是,tasks.py导入models.py,所以我无法从models.py导入tasks.py。有没有什么方法可以简单地使用它的名字来调用celery任务,而不必导入它?出于同样的原因(防止循环导入),对ForeignKey字段实现了类似的事情。 最佳答案 是的,有。您可以使用:fromcelery.executeimportsend_tasksend_task('my_task',[],kwargs)确保您的任务函数有一个名称:fromceleryi
我找不到任何用我自己的task_id设置task_id的例子类似的东西...deftestview1(request):foriinxrange(0,1000):result=add.delay(i,4,task_id=i)printresult.info#value=result.wait()returnHttpResponse("Done")@task()defadd(task_id,x,y):printadd.task_idprintstr(x+y)returnx+y 最佳答案 delay不支持options,它是apply_
这不是问题,而是对那些发现celery4.0.1文档中描述的周期性任务声明很难集成到django中的人有所帮助:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries复制粘贴celery配置文件main_app/celery.py:fromceleryimportCeleryfromcelery.schedulesimportcrontabapp=Celery()@app.on_after_configure.connectdefsetup_periodic_tasks(sender,*
我在我的应用程序中使用celery来运行周期性任务。让我们看下面的简单示例frommyqueueimportQueue@perodic_task(run_every=timedelta(minutes=1))defprocess_queue():queue=Queue()uid,questions=queue.pop()ifuidisNone:returnjob=group(do_stuff(q)forqinquestions)job.apply_async()defdo_stuff(question):try:...except:...raise正如您在上面的示例中看到的,我使用ce
我正在为Python寻找类似cron的分布式框架,并找到了Celery。然而,文档说“你必须确保一次只有一个调度程序在为一个计划运行,否则你最终会得到重复的任务”,Celery使用celery.beat.PersistentScheduler将计划存储到本地文件。所以,我的问题是,除了默认设置之外,还有其他实现可以将调度“放入集群”并协调任务执行,以便每个任务只运行一次吗?我的目标是能够在集群中的所有主机上以相同的时间表运行celerybeat。谢谢 最佳答案 tl;dr:没有Celerybeat不适合您的用例。您只需运行一个cel
我正在尝试为我的Django应用程序中的一些celery任务编写一些单元测试。这些任务将模型ID作为参数,执行一些操作并更新模型。在运行devserver和celeryworker时,一切都很好,但是在运行我的测试时,很明显celery任务没有使用作为测试运行的一部分创建和销毁的django测试数据库。问题是,我怎样才能让celery使用与其余测试相同的临时数据库?如您所见,我正在使用针对类似问题的每个答案中建议的设置覆盖。更新:发现如果我只是将对象本身传递给任务,而不是将对象id传递给任务并让任务从数据库中获取它,测试工作正常,显然对任务。所以至少现在,这将是我的解决办法。在我的测试
我以前有过这样的功能defcalculate(self,input):result=input*2ifresult>4:result_higher_then_four.send(result)returnresultresult_higher_then_four显然代表一个信号。然后我引入了celery,我的功能如下所示,我再也没有收到信号。我想信号是按进程绑定(bind)的,因为celery在不同的进程中运行,这意味着我无法在主进程中捕获信号。我应该使用thread_local来解决这个问题吗?还是我忽略了显而易见的事情?谢谢@taskdefcalculate(self,input)
TLDR;要为celery生成的每个进程运行初始化函数,您可以使用worker_process_init信号。正如您在docs中看到的那样,该信号的处理程序不应阻塞超过4秒。但是,如果我必须运行一个执行时间超过4秒的init函数,有什么选择呢?问题我使用C扩展模块在celery任务中运行某些操作。该模块需要初始化,可能需要几秒钟(可能是4-10秒)。因为我宁愿不为每个任务运行这个init函数,而是为每个生成的进程运行,我使用了worker_process_init信号:#lib.pyimportisclient#cextensionmoduleclient=Nonedefinit():