草庐IT

CELERY_ALWAYS_EAGER

全部标签

python - 如何使用参数检索挂起和正在执行的 Celery 任务?

在Celery文档中,有example检查正在执行的任务:Youcangetalistofactivetasksusingactive():>>>i.active()[{'worker1.example.com':[{'name':'tasks.sleeptask','id':'32666e9b-809c-41fa-8e93-5ae0c80afbbf','args':'(8,)','kwargs':'{}'}]}]但是这个调用只返回参数的表示,由repr()获得。有没有办法获取序列化任务参数? 最佳答案 好的,我将把这个作为答案放入

python - 将 Celery 与现有的 RabbitMQ 消息一起使用

我有一个现有的RabbitMQ部署,一些Java应用程序正在使用发送日志消息作为各种channel上的字符串JSON对象。我想使用Celery来消费这些消息并将它们写入不同的地方(例如DB、Hadoop等)。我可以看到Celery被设计为RabbitMQ消息的生产者和消费者,因为它试图隐藏传递这些消息的机制。无论如何让Celery消费由另一个应用程序创建的消息并在它们到达时运行作业? 最佳答案 目前很难将自定义消费者添加到celeryworker,但在开发版本(成为3.1)中我添加了对消费者引导步骤的支持,这正在发生变化。还没有文档

python - celery worker 和命令行参数

我正在重构我的代码以使用celeryworker。之前我使用argparse来传递命令行参数。例如if__name__=="__main__":parser=argparse.ArgumentParser(description='Node')parser.add_argument('--environment',action="store",default='local',help="enve.g.productionofdevelopment")environment=arg_options.environment但现在我得到了这个错误。celery-Atasksworker--l

python - celery 不适用于全局变量

fromceleryimportCeleryapp=Celery('tasks',backend='amqp://guest@localhost//',broker='amqp://guest@localhost//')a_num=0@app.taskdefaddone():globala_numa_num=a_num+1returna_num这是我用来测试celery的代码。我希望每次使用addone()时返回值都会增加。但它总是1为什么???结果python>>fromtasksimportaddone>>r=addone.delay()>>r.get()1>>r=addone.d

python - 删除 celery 中的 Task/PeriodicTask

如何删除celery中的常规Task或PeriodicTask? 最佳答案 您撤销任务:参见documentation:Control.revoke(task_id,destination=None,terminate=False,signal='SIGTERM',**kwargs)Tellall(orspecific)workerstorevokeataskbyid.Ifataskisrevoked,theworkerswillignorethetaskandnotexecuteitafterall.Parameters:task

python - 使用 Celery 初始化带有参数的 worker

我在寻找对我来说似乎相对简单的东西时遇到了问题。我正在使用Celery3.1和Python3,我想用参数初始化我的工作人员,以便他们可以使用这些详细信息进行设置。具体而言:这些工作人员将执行需要使用身份验证凭据与第三方API交互的任务。工作人员有必要在执行任何任务之前将身份验证详细信息传递给API服务器(身份验证详细信息在第一次身份验证请求后存储在cookie中)。我想在工作人员从CLI启动时将这些登录凭据传递给工作人员。然后我希望工作人员使用它们进行身份验证并存储session以供在使用future任务时使用(理想情况下这将存储在可以从任务访问的属性中)。Celery可以吗?作为旁注

python - 从 Celery 中的 taskset_id 检索 GroupResult?

我正在使用celery组启动一组celery任务,如officialdocumentation中所述我还将组(任务集)ID存储到数据库中,以便轮询celery的任务集状态。job=group([single_test.s(1,1),single_test.s(1,2),single_test.s(1,3),])result=job.apply_async()test_set=MyTestSet()test_set.taskset_id=result.id#storetest_setintoDB有没有办法从任务集ID开始获取GroupResult对象(即我的result)?类似于this

python - 使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务

我正在使用Celery4.0.1和Django1.10并且我在安排任务时遇到了问题(运行任务正常)。这是celery配置:os.environ.setdefault('DJANGO_SETTINGS_MODULE','myapp.settings')app=Celery('myapp')app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)app.conf.BROKER_URL='amqp://{}:{}@{}'.format(settings.AMQP_USER,settings.AMQP_PASSWORD,settings.AM

python - 按名称调用 Django celery 任务

我需要从models.py调用celery任务(在tasks.py中),唯一的问题是,tasks.py导入models.py,所以我无法从models.py导入tasks.py。有没有什么方法可以简单地使用它的名字来调用celery任务,而不必导入它?出于同样的原因(防止循环导入),对ForeignKey字段实现了类似的事情。 最佳答案 是的,有。您可以使用:fromcelery.executeimportsend_tasksend_task('my_task',[],kwargs)确保您的任务函数有一个名称:fromceleryi

python - 你在哪里设置 celery 任务的 task_id?

我找不到任何用我自己的task_id设置task_id的例子类似的东西...deftestview1(request):foriinxrange(0,1000):result=add.delay(i,4,task_id=i)printresult.info#value=result.wait()returnHttpResponse("Done")@task()defadd(task_id,x,y):printadd.task_idprintstr(x+y)returnx+y 最佳答案 delay不支持options,它是apply_