草庐IT

celery_tasks

全部标签

python - 你在哪里设置 celery 任务的 task_id?

我找不到任何用我自己的task_id设置task_id的例子类似的东西...deftestview1(request):foriinxrange(0,1000):result=add.delay(i,4,task_id=i)printresult.info#value=result.wait()returnHttpResponse("Done")@task()defadd(task_id,x,y):printadd.task_idprintstr(x+y)returnx+y 最佳答案 delay不支持options,它是apply_

python - 在 Django 中连接新的 celery 周期性任务

这不是问题,而是对那些发现celery4.0.1文档中描述的周期性任务声明很难集成到django中的人有所帮助:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#entries复制粘贴celery配置文件main_app/celery.py:fromceleryimportCeleryfromcelery.schedulesimportcrontabapp=Celery()@app.on_after_configure.connectdefsetup_periodic_tasks(sender,*

python - 回调 celery apply_async

我在我的应用程序中使用celery来运行周期性任务。让我们看下面的简单示例frommyqueueimportQueue@perodic_task(run_every=timedelta(minutes=1))defprocess_queue():queue=Queue()uid,questions=queue.pop()ifuidisNone:returnjob=group(do_stuff(q)forqinquestions)job.apply_async()defdo_stuff(question):try:...except:...raise正如您在上面的示例中看到的,我使用ce

python - 分布式 celery 调度器

我正在为Python寻找类似cron的分布式框架,并找到了Celery。然而,文档说“你必须确保一次只有一个调度程序在为一个计划运行,否则你最终会得到重复的任务”,Celery使用celery.beat.PersistentScheduler将计划存储到本地文件。所以,我的问题是,除了默认设置之外,还有其他实现可以将调度“放入集群”并协调任务执行,以便每个任务只运行一次吗?我的目标是能够在集群中的所有主机上以相同的时间表运行celerybeat。谢谢 最佳答案 tl;dr:没有Celerybeat不适合您的用例。您只需运行一个cel

python - 让 Celery 使用 Django 测试数据库

我正在尝试为我的Django应用程序中的一些celery任务编写一些单元测试。这些任务将模型ID作为参数,执行一些操作并更新模型。在运行devserver和celeryworker时,一切都很好,但是在运行我的测试时,很明显celery任务没有使用作为测试运行的一部分创建和销毁的django测试数据库。问题是,我怎样才能让celery使用与其余测试相同的临时数据库?如您所见,我正在使用针对类似问题的每个答案中建议的设置覆盖。更新:发现如果我只是将对象本身传递给任务,而不是将对象id传递给任务并让任务从数据库中获取它,测试工作正常,显然对任务。所以至少现在,这将是我的解决办法。在我的测试

python - celery 和信号

我以前有过这样的功能defcalculate(self,input):result=input*2ifresult>4:result_higher_then_four.send(result)returnresultresult_higher_then_four显然代表一个信号。然后我引入了celery,我的功能如下所示,我再也没有收到信号。我想信号是按进程绑定(bind)的,因为celery在不同的进程中运行,这意味着我无法在主进程中捕获信号。我应该使用thread_local来解决这个问题吗?还是我忽略了显而易见的事情?谢谢@taskdefcalculate(self,input)

python - Celery:运行冗长初始化函数的正确方法(每个进程)

TLDR;要为celery生成的每个进程运行初始化函数,您可以使用worker_process_init信号。正如您在docs中看到的那样,该信号的处理程序不应阻塞超过4秒。但是,如果我必须运行一个执行时间超过4秒的init函数,有什么选择呢?问题我使用C扩展模块在celery任务中运行某些操作。该模块需要初始化,可能需要几秒钟(可能是4-10秒)。因为我宁愿不为每个任务运行这个init函数,而是为每个生成的进程运行,我使用了worker_process_init信号:#lib.pyimportisclient#cextensionmoduleclient=Nonedefinit():

python - 任务状态和 django-celery

我使用django-celery并有这样的任务:classTestTask(Task):name="enabler.test_task"defrun(self,**kw):debug_log("begintesttask")time.sleep(5)debug_log("endtesttask")defon_success(self,retval,task_id,args,kwargs):debug_log("onsuccess")defon_failure(self,retval,task_id,args,kwargs):debug_log("onfailure")我使用django

python - Django Celery 实现 - OSError : [Errno 38] Function not implemented

我安装了django-celery并尝试启动工作服务器,但我收到一个OSError,表示某个功能未实现。我在VPS上运行CentOS5.4版(最终版):.broker->amqp://guest@localhost:5672/.queues->.celery->exchange:celery(direct)binding:celery.concurrency->4.loader->djcelery.loaders.DjangoLoader.logfile->[stderr]@WARNING.events->OFF.beat->OFF[2010-07-2217:10:01,364:WAR

python - 谷歌应用引擎 : task_retry_limit doesn't work?

我有一个PythonGAE应用。我希望我的任务停止运行,或者在失败时重试一次。现在,无论我的yaml文件告诉它们什么,它们都会永远运行!这是一个queue.yaml条目:-name:globalPurchaserate:10/sbucket_size:100retry_parameters:task_retry_limit:1如果globalPurchase任务失败并返回500错误代码,它将永远重试,直到成功并在日志中显示此消息:“队列“globalPurchase”上名为“task14”的任务失败,代码为500;将在30秒后重试”为什么实际上没有使用task_retry_limit?