草庐IT

worker-pool

全部标签

python - 为每个 celery worker 创建单独的数据库连接

当工作人员在创建后执行任务时,我不断遇到奇怪的mysql问题。我们使用django1.3、celery3.1.17、djorm-ext-pool0.5我们以并发3启动celery进程。到目前为止,我的观察是,当工作进程启动时,它们都获得相同的mysql连接。我们记录数据库连接ID,如下所示。fromdjango.dbimportconnectionconnection.cursor()logger.info("Task%sprocessingwithdbconnection%s",str(task_id),str(connection.connection.thread_id()))当

python - 集成测试多个 Celery Worker 和一个数据库支持的 Django API

我正在使用一个面向软件的架构,它有多个celeryworker(我们称它们为worker1、worker2和worker3)。所有三个工作人员都是独立的实体(即,独立的代码库、独立的repos、独立的celery实例、独立的机器),它们都没有连接到Django应用程序。通过基于Django、MySQL支持的RESTfulAPI与这三个工作人员中的每一个进行通信。在开发中,这些服务都在一个vagrantbox上,每一个都作为一个独立的机器运行在一个单独的端口上。我们有一个RabbitMQ代理来处理所有Celery任务。通过这些服务的典型路径可能如下所示:worker1从设备获取消息,进行

python - 如何从 Celery worker 捕获自定义异常,或阻止它以 `celery.backends.base` 为前缀?

我的Celery任务引发自定义异常NonTransientProcessingError,然后被AsyncResult.get()捕获.任务.py:classNonTransientProcessingError(Exception):pass@shared_task()defthrow_exception():raiseNonTransientProcessingError('ErrorraisedbyPOCmodelfortestpurposes')在Python控制台中:frommy_app.tasksimport*r=throw_exception.apply_async()t

python - 为什么我不能在 multiprocessing.Pool 中使用 operator.itemgetter?

以下程序:importmultiprocessing,operatorf=operator.itemgetter(0)#deff(*a):returnoperator.itemgetter(0)(*a)if__name__=='__main__':multiprocessing.Pool(1).map(f,["ab"])失败并出现以下错误:ProcessPoolWorker-1:Traceback(mostrecentcalllast):File"/usr/lib/python3.2/multiprocessing/process.py",line267,in_bootstrapsel

python - celery 可以分配任务以指定 worker

Celery会发送任务给空闲的worker。我有一个任务每5秒运行一次,我希望这个任务只发送给一个指定的工作人员。其他任务可以共享剩下的workercelery可以这样做吗??我想知道这个参数是什么:CELERY_TASK_RESULT_EXPIRES是不是表示任务不会发给队列中的某个worker?或者如果运行时间太长它会停止任务吗? 最佳答案 当然可以。最好的方法是使用不同的队列将celeryworker分开。您只需要确保您需要的任务进入单独的队列,并且您的工作人员正在监听特定的队列。长话短说:http://docs.celery

Python multiprocessing.Pool : AttributeError

我在一个类中有一个方法需要在一个循环中做很多工作,我想将这些工作分散到我的所有核心上。我写了下面的代码,如果我使用普通的map(),它可以工作,但是使用pool.map()会返回一个错误。importmultiprocessingpool=multiprocessing.Pool(multiprocessing.cpu_count()-1)classOtherClass:defrun(sentence,graph):returnFalseclassSomeClass:def__init__(self):self.sentences=[["Somestring"]]self.graphs

python - 在一台机器上的 celery worker 之间共享内存区域

我想在celery中的工作节点之间共享小块信息(例如缓存的授权token、统计信息等)。如果我在我的任务文件中创建一个全局变量,它对每个工作人员都是唯一的(我的工作人员是进程,并且有1个任务/执行的生命周期)。最佳做法是什么?我是否应该在外部保存状态(DB),创建一个老式的共享内存(由于celery中不同的池实现可能很困难)?提前致谢! 最佳答案 我终于找到了一个像样的解决方案——corepythonmultiprocessing-Manager:frommultiprocessingimportManagermanag=Manag

python - multiprocessing.Pool.imap_unordered 的内存使用量稳步增长

我刚刚注意到我的程序在处理一个大文件时使用了越来越多的内存。不过,它一次只处理一行,所以我不明白为什么它会继续使用更多内存。经过大量挖掘,我意识到该程序分为三个部分:加载数据,一次一行。使用imap_unordered()处理multiprocessing.Pool中的每一行。在单个线程中处理每一行。如果第1步和第2步比第3步快,那么池worker的结果将排队,消耗内存。我如何限制在第2步中输入池中的数据,使其不会在第3步中领先于消费者?这看起来类似于anothermultiprocessingquestion,但我不清楚这个问题的延迟在哪里。这是一个演示问题的小例子:importlo

python - PyTorch 数据加载器中的 "number of workers"参数实际上是如何工作的?

如果num_workers为2,这是否意味着它会将2个批处理放入RAM并将其中的1个发送到GPU还是将3个批处理放入RAM然后将其中的1个发送到GPU?当worker数量高于CPU核心数量时,实际会发生什么情况?我试过了,效果很好,但它是如何工作的?(我以为我可以选择的最大worker数量是核心数)。如果我将num_workers设置为3,并且在训练期间GPU的内存中没有批处理,主进程是等待其工作人员读取批处理还是读取单个批处理(无需等待worker)? 最佳答案 当num_workers>0时,只有这些worker会检索数据,主进

python - 将接受类成员函数作为变量的函数传递给python multiprocess pool.map()

嗨,我上午的大部分时间都在为这个问题苦苦挣扎,希望有人能给我指出正确的方向。这是我目前的代码:deff(tup):returnsome_complex_function(*tup)defmain():pool=Pool(processes=4)#importandprocessdataomitted_args=[(x.some_func1,.05,x.some_func2)forxinlist_of_some_class]results=pool.map(f,_args)printresults我得到的第一个错误是:>ExceptioninthreadThread-2:Tracebac