草庐IT

python - 如何向 Celery (celerybeat) 动态添加/删除周期性任务

如果我有如下定义的函数:defadd(x,y):returnx+y有没有办法将此函数动态添加为celeryPeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:some_unique_task_id=celery.beat.schedule_task(add,run_every=crontab(minute="*/30"))celery.beat.start(some_unique_task_id)我还想使用(伪代码)之类的东西动态停止或删除该任务:celery.beat.remove_task(some_unique_task_id)或celery.beat.s

python - 如何向 Celery (celerybeat) 动态添加/删除周期性任务

如果我有如下定义的函数:defadd(x,y):returnx+y有没有办法将此函数动态添加为celeryPeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:some_unique_task_id=celery.beat.schedule_task(add,run_every=crontab(minute="*/30"))celery.beat.start(some_unique_task_id)我还想使用(伪代码)之类的东西动态停止或删除该任务:celery.beat.remove_task(some_unique_task_id)或celery.beat.s

python - 运行更多任务的 Celery 任务

我正在使用celerybeat启动一项主要任务,该任务启动了许多次要任务。我已经写好了这两个任务。有没有办法轻松做到这一点?Celery是否允许在任务中运行任务?我的例子:@taskdefcompute(users=None):ifusersisNone:users=User.objects.all()tasks=[]foruserinusers:tasks.append(compute_for_user.subtask((user.id,)))job=TaskSet(tasks)job.apply_async()#raisesaIOError:Socketclosed@taskdef

python - 运行更多任务的 Celery 任务

我正在使用celerybeat启动一项主要任务,该任务启动了许多次要任务。我已经写好了这两个任务。有没有办法轻松做到这一点?Celery是否允许在任务中运行任务?我的例子:@taskdefcompute(users=None):ifusersisNone:users=User.objects.all()tasks=[]foruserinusers:tasks.append(compute_for_user.subtask((user.id,)))job=TaskSet(tasks)job.apply_async()#raisesaIOError:Socketclosed@taskdef

python - celery 与 RabbitMQ : AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

我正在运行FirstStepswithCeleryTutorial.我们定义以下任务:fromceleryimportCeleryapp=Celery('tasks',broker='amqp://guest@localhost//')@app.taskdefadd(x,y):returnx+y然后调用它:>>>fromtasksimportadd>>>add.delay(4,4)但我收到以下错误:AttributeError:'DisabledBackend'objecthasnoattribute'_get_task_meta_for'我正在运行celeryworker和rabbi

python - celery 与 RabbitMQ : AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

我正在运行FirstStepswithCeleryTutorial.我们定义以下任务:fromceleryimportCeleryapp=Celery('tasks',broker='amqp://guest@localhost//')@app.taskdefadd(x,y):returnx+y然后调用它:>>>fromtasksimportadd>>>add.delay(4,4)但我收到以下错误:AttributeError:'DisabledBackend'objecthasnoattribute'_get_task_meta_for'我正在运行celeryworker和rabbi

python - 在 Airflow 中创建动态工作流的正确方法

问题Airflow中是否有任何方法可以创建一个工作流,以便在任务A完成之前,任务B.*的数量是未知的?我查看了subdags,但它似乎只能用于必须在创建Dag时确定的一组静态任务。dag触发器会起作用吗?如果可以,请提供一个例子。我有一个问题,在任务A完成之前,无法知道计算任务C所需的任务B的数量。每个任务B.*都需要几个小时来计算,并且不能合并。|--->TaskB.1--||--->TaskB.2--|TaskA------|--->TaskB.3--|----->TaskC|....||--->TaskB.N--|想法#1我不喜欢这个解决方案,因为我必须创建一个阻塞的Extern

python - 在 Airflow 中创建动态工作流的正确方法

问题Airflow中是否有任何方法可以创建一个工作流,以便在任务A完成之前,任务B.*的数量是未知的?我查看了subdags,但它似乎只能用于必须在创建Dag时确定的一组静态任务。dag触发器会起作用吗?如果可以,请提供一个例子。我有一个问题,在任务A完成之前,无法知道计算任务C所需的任务B的数量。每个任务B.*都需要几个小时来计算,并且不能合并。|--->TaskB.1--||--->TaskB.2--|TaskA------|--->TaskB.3--|----->TaskC|....||--->TaskB.N--|想法#1我不喜欢这个解决方案,因为我必须创建一个阻塞的Extern

python - Celery - 获取当前任务的任务 ID

如何从任务中获取任务的task_id值?这是我的代码:fromcelery.decoratorsimporttaskfromdjango.core.cacheimportcache@taskdefdo_job(path):"Performsanoperationonafile"#...Codetoperformtheoperation...cache.set(current_task_id,operation_results)这个想法是,当我创建任务的新实例时,我会从任务对象中检索task_id。然后我使用任务ID来确定任务是否已完成。我不想希望通过path值跟踪任务,因为该文件在任务

python - Celery - 获取当前任务的任务 ID

如何从任务中获取任务的task_id值?这是我的代码:fromcelery.decoratorsimporttaskfromdjango.core.cacheimportcache@taskdefdo_job(path):"Performsanoperationonafile"#...Codetoperformtheoperation...cache.set(current_task_id,operation_results)这个想法是,当我创建任务的新实例时,我会从任务对象中检索task_id。然后我使用任务ID来确定任务是否已完成。我不想希望通过path值跟踪任务,因为该文件在任务