草庐IT

python - celery 结果后端。 DisabledBackend 对象没有属性 _get_task_meta_for

我已经配置了celery和后端:cleryapp=Celery('tasks_app',brocker='amqp://guest@localhost//',backend='db+postgresql://guest@localhost:5432')当我启动worker时,'results'似乎被禁用,但我在这里读到另一个问题,这不是问题所在。数据库正在正确获取所有数据,但是result=AsyncResult(task_id)加注AttributeError:'DisabledBackend'objecthasnoattribute'_get_task_meta_for'

python - celery 节拍 : Limit to single task instance at a time

我有celerybeat和celery(四个worker)批量做一些加工步骤。其中一项任务大致是这样的:“对于每个尚未创建Y的X,创建一个Y。”任务以半快速(10秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行中。我已经多次遇到节拍任务明显积压的问题,因此同一任务(来自不同的节拍时间)同时执行,导致错误地重复工作。任务似乎也是乱序执行的。是否可以限制celerybeat以确保一次只有一个未完成的任务实例?在任务上设置类似rate_limit=5的设置是否是执行此操作的“正确”方法?是否可以确保节拍任务按顺序执行,例如beat不是分派(dispatch)任务,而是将其添加到任务

python - celery 中@task装饰器之后的装饰器

我正在尝试在celery@task装饰器之后应用一个装饰器,例如。@send_email@taskdefany_function():print"insidethefunction"我可以按照文档中推荐的方式让它工作,即将装饰器放在任务装饰器之前,但在这种情况下,我想在我的装饰器中访问任务实例。@send_email必须是类装饰器,这是我尝试但没有成功的方法:classsend_email(object):'''wrapsaTaskceleryclass'''def__init__(self,obj):self.wrapped_obj=objfunctools.update_wrapp

python - 从未知任务中检索 Celery 中 'task_id' 的结果

如果我之前不知道执行了哪个任务,如何提取任务的结果?这是设置:给定以下来源('tasks.py'):fromceleryimportCeleryapp=Celery('tasks',backend="db+mysql://u:p@localhost/db",broker='amqp://guest:guest@localhost:5672//')@app.taskdefadd(x,y):returnx+y@app.taskdefmul(x,y):returnx*y在本地运行RabbitMQ3.3.2:marcs-mbp:sbinmarcstreeter$./rabbitmq-serve

python - 属性错误 : module 'asyncio' has no attribute 'create_task'

我正在尝试asyncio.create_task()但我正在处理这个错误:这是一个例子:importasyncioimporttimeasyncdefasync_say(delay,msg):awaitasyncio.sleep(delay)print(msg)asyncdefmain():task1=asyncio.create_task(async_say(4,'hello'))task2=asyncio.create_task(async_say(6,'world'))print(f"startedat{time.strftime('%X')}")awaittask1awaitt

python - 为什么我在 Python asyncio 中收到 "Task was destroyed but it is pending"错误?

我使用asyncio和漂亮的aiohttp。主要思想是我向服务器发出请求(它返回链接)然后我想从所有链接下载文件parallel(类似于example)。代码:importaiohttpimportasyncio@asyncio.coroutinedefdownloader(file):print('Download',file['title'])yieldfromasyncio.sleep(1.0)#someactionstodownloadprint('OK',file['title'])defrun():r=yieldfromaiohttp.request('get','my_u

python - 删除 celery 中的 Task/PeriodicTask

如何删除celery中的常规Task或PeriodicTask? 最佳答案 您撤销任务:参见documentation:Control.revoke(task_id,destination=None,terminate=False,signal='SIGTERM',**kwargs)Tellall(orspecific)workerstorevokeataskbyid.Ifataskisrevoked,theworkerswillignorethetaskandnotexecuteitafterall.Parameters:task

python - 使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务

我正在使用Celery4.0.1和Django1.10并且我在安排任务时遇到了问题(运行任务正常)。这是celery配置:os.environ.setdefault('DJANGO_SETTINGS_MODULE','myapp.settings')app=Celery('myapp')app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)app.conf.BROKER_URL='amqp://{}:{}@{}'.format(settings.AMQP_USER,settings.AMQP_PASSWORD,settings.AM

python - 你在哪里设置 celery 任务的 task_id?

我找不到任何用我自己的task_id设置task_id的例子类似的东西...deftestview1(request):foriinxrange(0,1000):result=add.delay(i,4,task_id=i)printresult.info#value=result.wait()returnHttpResponse("Done")@task()defadd(task_id,x,y):printadd.task_idprintstr(x+y)returnx+y 最佳答案 delay不支持options,它是apply_

python - 谷歌应用引擎 : task_retry_limit doesn't work?

我有一个PythonGAE应用。我希望我的任务停止运行,或者在失败时重试一次。现在,无论我的yaml文件告诉它们什么,它们都会永远运行!这是一个queue.yaml条目:-name:globalPurchaserate:10/sbucket_size:100retry_parameters:task_retry_limit:1如果globalPurchase任务失败并返回500错误代码,它将永远重试,直到成功并在日志中显示此消息:“队列“globalPurchase”上名为“task14”的任务失败,代码为500;将在30秒后重试”为什么实际上没有使用task_retry_limit?