我需要找到如何指定一种初始celery任务,它将以特殊定义的方式启动所有其他任务。这个初始任务应该在celery服务器启动时立即运行,并且永远不会再次运行。 最佳答案 如何使用celeryd_after_setup或celeryd_init信号?文档中的以下示例代码:fromcelery.signalsimportceleryd_init@celeryd_init.connect(sender='worker12@example.com')defconfigure_worker12(conf=None,**kwargs):...
这是我们目前拥有的:我们正在尝试获取缓存的Django模型实例,缓存键包括模型名称和实例ID。使用Django的标准memcached后端。此程序是非常广泛使用的常用程序的一部分,不仅在celery中。有时(随机和/或很少)cache.get(key)返回错误的对象:int或不同的模型实例,甚至出现相同模型不同ID的情况。我们通过检查模型名称和ID与缓存键的对应关系来捕捉这一点。bug只出现在我们三个celery任务的上下文中,从不在pythonshell或其他celery任务中重现。UPD:仅出现在长时间运行的CPU-RAM密集型任务下缓存存储正确的值(我们在错误刚刚出现时手动检查)
我开始使用Celery和Python,我有一个问题可能很简单,但我似乎找不到任何合适的答案......如果我有一堆任务,其中一个抛出异常,有没有办法检索传递给所述任务的参数?例如,如果我想获取一些主机名解析到的IP,然后创建一个任务...@tasks_app.taskdefresolve_hostname(hostname):return(hostname,{hst.addressforhstindns.resolver.query(hostname)})...这可能会引发异常,有没有办法在异常发生时在调用之外获取该hostname参数的值?假设我将任务分组如下:ip_subtasks
我想使用Celery在具有四张Tesla卡的GPU服务器上运行作业。我用一个由四个worker组成的池运行Celeryworker,这样每张卡总是运行一项工作。我的问题是如何指示worker每人领取一个GPU。目前我依赖于工作进程都应该有连续进程ID的假设:device_id=os.getpid()%self.ndevices但是,我不能保证它总是有效,即当工作进程随着时间的推移重新启动时。所以理想情况下,我想直接获取每个worker的ID。有人可以告诉我是否可以从任务中检查工作人员,或者可以建议一种不同的解决方案来跨GPU分配作业? 最佳答案
所以我有一个Django应用程序,它偶尔会向Celery发送一个任务以进行异步执行。我发现当我在开发中处理我的代码时,Django开发服务器知道如何自动检测代码何时发生更改,然后重新启动服务器以便我可以看到我的更改。但是,我的应用程序的RabbitMQ/Celery部分没有接受这些开发中的变化。如果我更改稍后将在Celery任务中运行的代码,Celery仍将继续运行旧版本的代码。我能让它接受变化的唯一方法是:停止celeryworker停止RabbitMQ重置RabbitMQ启动RabbitMQ将用户添加到我的Django应用配置为使用的RabbitMQ为此用户设置适当的权限重启Cel
我希望Celery任务依赖于2个或更多其他任务的结果。我调查了Python+Celery:Chainingjobs?和http://pypi.python.org/pypi/celery-tasktree,但只有当任务只有一个依赖任务时,这些才是好的。我知道TaskSet,但似乎没有办法在TaskSetResult.ready()变为True时立即执行回调。我现在想到的是有一个周期性任务,每隔几[毫秒]秒左右轮询一次TaskSetResult.ready()并在回调返回True时触发回调,但这对我来说听起来很不雅观。有什么建议吗? 最佳答案
我正在使用Python(2.7.3)中的多处理模块,并且想调试我的工作人员中正在进行的一些事情。但是,我似乎无法在工作线程中捕获任何异常。一个最小的例子:importmultiprocessingasmpa=[1]defworker():printa[2]defpool():pool=mp.Pool(processes=1)pool.apply_async(worker,args=())pool.close()pool.join()print"Multiprocessingdone!"if__name__=='__main__':pool()预计会引发IndexError,但我的输出只
我在Heroku(免费层)上运行Flask应用程序,在使用apply_async安排任务时遇到了一些问题。如果我安排了两个以上的任务,我会得到一个很长的堆栈跟踪,但有一个异常(exception):AccessRefused(403,u"ACCESS_REFUSED-accesstoexchange'celeryresults'invhost'rthtwchf'refusedforuser'rthtwchf'",(40,10),'Exchange.declare')奇怪的是前两个任务(在重新启动我的所有进程之前)似乎总是毫无问题地完成。搜索引擎的一点点调查让我找到了https://st
我想在celery链命令中使用block。chain=task1.s(arg1)|task2.chunks(?,CHUNK_SIZE)|task3.chunks(?,CHUNK_SIZE)基本上我想做的是运行task1,将其结果分block并将分block发送到task2,然后task2应该调用task3,task3也应该从task2接收分block结果以完成该过程。为什么?因为task1和task2都可以返回相当数量的项目,我想分批处理这些项目。上面的代码不起作用,因为我不太确定用什么代替问号才能使其起作用。我不太确定这是否可行,因为搜索没有提供太多结果,所以如果无法构建这样的工作流
我的一些RemoteCelery任务似乎永远不会到达我的经纪人(RabbitMQ)。这似乎是随机发生的。我的日志中没有NO错误,它们永远不会到达工作人员或失败。Flower/Rabbit从不报告任务失败。我使用tcpflow-p-c-ieth0port5672来监控API上发送任务(client)的流量。当API成功发送任务时,外出流量记录如下:(已删除敏感数据)192.018.000.002.42738-052.048.150.171.05672:AMQP052.048.150.171.05672-192.018.000.002.42738:capabilitiesFpublishe