我在我的服务器上安装了Django-Celery并尝试通过以下代码发送任务:$./manage.pyshellPython3.4.3(default,Oct142015,20:28:29)Type"copyright","credits"or"license"formoreinformation.IPython4.0.0--AnenhancedInteractivePython.?->IntroductionandoverviewofIPython'sfeatures.%quickref->Quickreference.help->Python'sownhelpsystem.objec
我正在使用Celery3.1.16代理(运行RabbitMQ)和多个Celeryworker,celeryd通过主管守护。问题在于任务更新。当我更新我的tasks.py文件时,celeryworker运行旧代码。Celery启动命令:/home/my_project/bin/celery-B--autoreload--app=my_app.celery:appworker--loglevel=INFO我在djangosettings.py中包含任务文件:CELERY_IMPORTS=['my_app.tasks']pyinotify已安装并工作(我猜是这样),celery日志的一部分:
我想要一个可以查看所有未决任务的地方。我不是在谈论注册的函数/类作为任务,而是我可以显示的实际计划作业:名称、task_id、eta、worker等。使用Celery2.0.2和djcelery,我在文档中找到了“inspect”。我试过:fromcelery.task.controlimportinspectdefget_scheduled_tasks(nodes=None):ifnodes:i=inspect(nodes)else:i=inspect()scheduled_tasks=[]dump=i.scheduled()ifdump:forworker,tasksindump:
我发现celery支持任务链:http://celery.readthedocs.org/en/latest/userguide/canvas.html#chains.问题是:如何停止链在任务中的执行?例如,我们得到了一个包含N个项目的链(N>2)。在第二个任务中,我们意识到我们不需要执行所有其余任务。怎么办? 最佳答案 在较新版本的celery(3.1.6)中,您可以通过简单地遍历链并依次撤销每个项目来撤销整个链。#Buildachainforresultsfromtasksimportadddfromceleryimportch
我们正在使用celery进行第三方http调用。我们有大约100多个简单地调用第三方HTTPAPI调用的任务。有些任务会批量调用API,例如凌晨4点发出50万个请求,而有些任务是连续的API调用流,几乎每秒接收一次或两次请求。大多数API调用响应时间在500-800毫秒之间。我们发现celery的交货速度非常慢。对于上述大多数任务,最大交付率在100/s(最大)到接近1/s(最小)之间。我相信这很糟糕,肯定有问题,但我无法弄清楚它是什么。我们从3台服务器的集群开始,逐渐将其变成7台服务器的集群,但没有任何改进。我们尝试了不同的并发设置,从自动缩放到固定的10、20、50、100个工作人
我需要找到如何指定一种初始celery任务,它将以特殊定义的方式启动所有其他任务。这个初始任务应该在celery服务器启动时立即运行,并且永远不会再次运行。 最佳答案 如何使用celeryd_after_setup或celeryd_init信号?文档中的以下示例代码:fromcelery.signalsimportceleryd_init@celeryd_init.connect(sender='worker12@example.com')defconfigure_worker12(conf=None,**kwargs):...
这是我们目前拥有的:我们正在尝试获取缓存的Django模型实例,缓存键包括模型名称和实例ID。使用Django的标准memcached后端。此程序是非常广泛使用的常用程序的一部分,不仅在celery中。有时(随机和/或很少)cache.get(key)返回错误的对象:int或不同的模型实例,甚至出现相同模型不同ID的情况。我们通过检查模型名称和ID与缓存键的对应关系来捕捉这一点。bug只出现在我们三个celery任务的上下文中,从不在pythonshell或其他celery任务中重现。UPD:仅出现在长时间运行的CPU-RAM密集型任务下缓存存储正确的值(我们在错误刚刚出现时手动检查)
我开始使用Celery和Python,我有一个问题可能很简单,但我似乎找不到任何合适的答案......如果我有一堆任务,其中一个抛出异常,有没有办法检索传递给所述任务的参数?例如,如果我想获取一些主机名解析到的IP,然后创建一个任务...@tasks_app.taskdefresolve_hostname(hostname):return(hostname,{hst.addressforhstindns.resolver.query(hostname)})...这可能会引发异常,有没有办法在异常发生时在调用之外获取该hostname参数的值?假设我将任务分组如下:ip_subtasks
我想使用Celery在具有四张Tesla卡的GPU服务器上运行作业。我用一个由四个worker组成的池运行Celeryworker,这样每张卡总是运行一项工作。我的问题是如何指示worker每人领取一个GPU。目前我依赖于工作进程都应该有连续进程ID的假设:device_id=os.getpid()%self.ndevices但是,我不能保证它总是有效,即当工作进程随着时间的推移重新启动时。所以理想情况下,我想直接获取每个worker的ID。有人可以告诉我是否可以从任务中检查工作人员,或者可以建议一种不同的解决方案来跨GPU分配作业? 最佳答案
所以我有一个Django应用程序,它偶尔会向Celery发送一个任务以进行异步执行。我发现当我在开发中处理我的代码时,Django开发服务器知道如何自动检测代码何时发生更改,然后重新启动服务器以便我可以看到我的更改。但是,我的应用程序的RabbitMQ/Celery部分没有接受这些开发中的变化。如果我更改稍后将在Celery任务中运行的代码,Celery仍将继续运行旧版本的代码。我能让它接受变化的唯一方法是:停止celeryworker停止RabbitMQ重置RabbitMQ启动RabbitMQ将用户添加到我的Django应用配置为使用的RabbitMQ为此用户设置适当的权限重启Cel