草庐IT

python - 将 celery 优先级队列与广播任务一起使用

我想在我的celeryworker中实现任务优先级。我可以通过为高优先级任务和低优先级任务创建不同的队列来做到这一点。但我还需要将广播任务发送给所有具有广播队列且无法正常工作的工作人员。这是tasks.py文件:fromceleryimportCeleryfromkombu.commonimportBroadcast,Queue,Exchangeapp=Celery('tasks')app.conf.update(CELERY_RESULT_BACKEND='amqp',CELERY_ACCEPT_CONTENT=['json'],CELERY_TASK_SERIALIZER='jso

python - 一些 Celery 任务有效,其他任务未注册

我关注theCeleryDjango教程和我在示例(add,mul)中看到的任务非常适合我。当我执行res=add.delay(1,2);时,我得到了正确的响应res.get()。但是当我尝试执行另一个任务时,我得到了***NotRegistered:u'pipeline.tasks.sayhello'res=sayhello.delay('trex')。如果我执行res=sayhello('trex'),那么只需键入res即可获得结果。但是通过这种方式,我在不使用Celery的情况下执行了函数。只有当我在Djangoshell./manageshell中运行它时,任务才有效>>>re

python - celery 不释放内存

看起来celery在任务完成后不会释放内存。每次任务完成时,都会有5m-10m的内存泄漏。因此,如果有数千个任务,很快就会用完所有内存。BROKER_URL='amqp://user@localhost:5672/vhost'#CELERY_RESULT_BACKEND='amqp://user@localhost:5672/vhost'CELERY_IMPORTS=('tasks.tasks',)CELERY_IGNORE_RESULT=TrueCELERY_DISABLE_RATE_LIMITS=True#CELERY_ACKS_LATE=TrueCELERY_TASK_RESUL

python - Celery - 最小化内存消耗

我们有约300个celeryd进程在Ubuntu10.464位下运行,在空闲状态下每个进程占用约19mbRES,约174mbVIRT,因此-所有进程约有6GB的空闲RAM。处于事件状态-进程最多占用100mb的RES和~300mb的VIRT每个进程都使用了minidom(xml文件问题是-我们如何减少RAM消耗-至少对于空闲的工作人员来说,可能一些celery或python选项可能会有所帮助?如何判断哪个部分占用内存最多?UPD:那是航类搜索代理,一个机构/日期的一名工作人员。我们有10个机构,一个用户搜索==9个日期,因此每个用户搜索有10*9个代理。是否可以按需启动celeryd进

python - 如何使用参数检索挂起和正在执行的 Celery 任务?

在Celery文档中,有example检查正在执行的任务:Youcangetalistofactivetasksusingactive():>>>i.active()[{'worker1.example.com':[{'name':'tasks.sleeptask','id':'32666e9b-809c-41fa-8e93-5ae0c80afbbf','args':'(8,)','kwargs':'{}'}]}]但是这个调用只返回参数的表示,由repr()获得。有没有办法获取序列化任务参数? 最佳答案 好的,我将把这个作为答案放入

python - 将 Celery 与现有的 RabbitMQ 消息一起使用

我有一个现有的RabbitMQ部署,一些Java应用程序正在使用发送日志消息作为各种channel上的字符串JSON对象。我想使用Celery来消费这些消息并将它们写入不同的地方(例如DB、Hadoop等)。我可以看到Celery被设计为RabbitMQ消息的生产者和消费者,因为它试图隐藏传递这些消息的机制。无论如何让Celery消费由另一个应用程序创建的消息并在它们到达时运行作业? 最佳答案 目前很难将自定义消费者添加到celeryworker,但在开发版本(成为3.1)中我添加了对消费者引导步骤的支持,这正在发生变化。还没有文档

python - celery worker 和命令行参数

我正在重构我的代码以使用celeryworker。之前我使用argparse来传递命令行参数。例如if__name__=="__main__":parser=argparse.ArgumentParser(description='Node')parser.add_argument('--environment',action="store",default='local',help="enve.g.productionofdevelopment")environment=arg_options.environment但现在我得到了这个错误。celery-Atasksworker--l

python - celery 不适用于全局变量

fromceleryimportCeleryapp=Celery('tasks',backend='amqp://guest@localhost//',broker='amqp://guest@localhost//')a_num=0@app.taskdefaddone():globala_numa_num=a_num+1returna_num这是我用来测试celery的代码。我希望每次使用addone()时返回值都会增加。但它总是1为什么???结果python>>fromtasksimportaddone>>r=addone.delay()>>r.get()1>>r=addone.d

python - 删除 celery 中的 Task/PeriodicTask

如何删除celery中的常规Task或PeriodicTask? 最佳答案 您撤销任务:参见documentation:Control.revoke(task_id,destination=None,terminate=False,signal='SIGTERM',**kwargs)Tellall(orspecific)workerstorevokeataskbyid.Ifataskisrevoked,theworkerswillignorethetaskandnotexecuteitafterall.Parameters:task

python - 使用 Celery 初始化带有参数的 worker

我在寻找对我来说似乎相对简单的东西时遇到了问题。我正在使用Celery3.1和Python3,我想用参数初始化我的工作人员,以便他们可以使用这些详细信息进行设置。具体而言:这些工作人员将执行需要使用身份验证凭据与第三方API交互的任务。工作人员有必要在执行任何任务之前将身份验证详细信息传递给API服务器(身份验证详细信息在第一次身份验证请求后存储在cookie中)。我想在工作人员从CLI启动时将这些登录凭据传递给工作人员。然后我希望工作人员使用它们进行身份验证并存储session以供在使用future任务时使用(理想情况下这将存储在可以从任务访问的属性中)。Celery可以吗?作为旁注