草庐IT

celery_worker

全部标签

python - Celery:运行冗长初始化函数的正确方法(每个进程)

TLDR;要为celery生成的每个进程运行初始化函数,您可以使用worker_process_init信号。正如您在docs中看到的那样,该信号的处理程序不应阻塞超过4秒。但是,如果我必须运行一个执行时间超过4秒的init函数,有什么选择呢?问题我使用C扩展模块在celery任务中运行某些操作。该模块需要初始化,可能需要几秒钟(可能是4-10秒)。因为我宁愿不为每个任务运行这个init函数,而是为每个生成的进程运行,我使用了worker_process_init信号:#lib.pyimportisclient#cextensionmoduleclient=Nonedefinit():

python - 使用 worker 的多处理池

我编写了以下代码来让我懒惰的第二个CPU核心工作。代码所做的基本上是首先在目录层次结构中找到所需的“海”文件,然后执行一组外部脚本来处理这些二进制“海”文件,以生成50到100个文本和二进制文件。正如问题的标题所建议的那样,以并行方式提高处理速度。这个问题源于我们在IPython用户列表上进行的长时间讨论,标题为“Cannotstartipcluster”。从我对IPython的并行处理功能的实验开始。问题是我无法让这段代码正确运行。如果包含“sea”文件的文件夹仅包含“sea”文件,脚本将完成其执行而不完全执行外部脚本运行。(假设我有30-50个外部脚本要运行,但我的多处理启用脚本仅

python - 任务状态和 django-celery

我使用django-celery并有这样的任务:classTestTask(Task):name="enabler.test_task"defrun(self,**kw):debug_log("begintesttask")time.sleep(5)debug_log("endtesttask")defon_success(self,retval,task_id,args,kwargs):debug_log("onsuccess")defon_failure(self,retval,task_id,args,kwargs):debug_log("onfailure")我使用django

python - Django Celery 实现 - OSError : [Errno 38] Function not implemented

我安装了django-celery并尝试启动工作服务器,但我收到一个OSError,表示某个功能未实现。我在VPS上运行CentOS5.4版(最终版):.broker->amqp://guest@localhost:5672/.queues->.celery->exchange:celery(direct)binding:celery.concurrency->4.loader->djcelery.loaders.DjangoLoader.logfile->[stderr]@WARNING.events->OFF.beat->OFF[2010-07-2217:10:01,364:WAR

python - TensorFlow Master 和 Worker 服务

我想了解主服务和辅助服务在TensorFlow中的确切角色。到目前为止,我了解到我启动的每个TensorFlow任务都与一个tf.train.Server实例相关联。此实例通过实现tensorflow::Session导出“主服务”和“辅助服务”接口(interface)”(master)和worker_service.proto(worker)。第一个问题:我的意思是,一项任务只与一名工作人员相关联吗?此外,我明白了.........关于大师:它是主服务的范围......(1)...向客户端提供功能,以便客户端可以运行session。(2)...将工作委派给可用的工作人员以计算ses

python - Celery + Eventlet + 非阻塞请求

我在celeryworkers中使用Pythonrequests进行大量(~10/秒)API调用(包括GET、POST、PUT、DELETE)。每个请求大约需要5-10秒才能完成。我尝试在eventlet池中运行celeryworker,并发数为1000。由于请求是阻塞进程,每个并发连接都在等待一个请求。如何使请求异步? 最佳答案 使用eventletmonkeypatching使任何纯Python库成为非阻塞的。补丁单库#importrequests#insteaddothis:importeventletrequests=eve

python - 如何创建 Celery Windows 服务?

我正在尝试创建一个Windows服务来启动Celery。我遇到过一篇使用TaskScheduler做的文章.然而,它似乎启动了许多celery实例并不断消耗内存直到机器死机。有什么方法可以将其作为Windows服务启动吗? 最佳答案 我从另一个网站得到了答案。Celeryd(Celery的守护程序服务)作为粘贴应用程序运行,搜索“PasterWindows服务”引导我here.它描述了如何将Pylons应用程序作为Windows服务运行。作为paster框架和托管python网络服务的新手,我一开始并没有想过要检查它。但该解决方案适

python - 从 celery 任务中获取 celery worker 的名字?

我希望celery任务能够获取执行它的工作人员的姓名,以便进行日志记录。我需要在任务中处理这个问题,而不是直接查询代理。有没有办法做到这一点?如果重要的话,我正在将celery与RabbitMQ一起使用。 最佳答案 使用celeryd_after_setup信号像这样捕获worker名称:fromcelery.signalsimportceleryd_after_setup@celeryd_after_setup.connectdefcapture_worker_name(sender,instance,**kwargs):os.e

python - Celery 作为网络发布/订阅事件

我想设置一个网络发布/订阅事件系统,但还需要能够异步运行任务。我曾尝试让celery来完成繁重的工作,但我觉得我正在尝试填充一大堆东西来让它工作。我有两台机器(输入和输出),它们都可以访问RabbitMQ。我想让一个主程序启动一个等待输入的循环(网络摄像头检测到的运动)。我已经设置了input_machine启动main.py,它启动了一个celery任务,该任务由input_machine上的工作人员监控到“输入”队列。这个任务只运行whileTrue循环,直到检测到一些输入,然后它调用另一个名为('project.entered_room',什么都不做)celery任务到“输出”队

python - 如何处理 celery 的 Task.map 中的错误

假设我有两个celery任务:@celery.taskdefrun_flakey_things(*args,**kwargs):returnrun_flakey_and_synchronous_thing.map(xrange(10)).apply_async()@celery.taskdefrun_flakey_and_synchronous_thing(a):ifa%5:returnaraiseRuntimeError(a)因此,当您运行run_flakey_things时,它会立即失败,因为序列中的第一项会引发异常。我想要的是像map一样按顺序为序列中的所有项目运行任务,但继续运