草庐IT

worker-pool

全部标签

python - TensorFlow Master 和 Worker 服务

我想了解主服务和辅助服务在TensorFlow中的确切角色。到目前为止,我了解到我启动的每个TensorFlow任务都与一个tf.train.Server实例相关联。此实例通过实现tensorflow::Session导出“主服务”和“辅助服务”接口(interface)”(master)和worker_service.proto(worker)。第一个问题:我的意思是,一项任务只与一名工作人员相关联吗?此外,我明白了.........关于大师:它是主服务的范围......(1)...向客户端提供功能,以便客户端可以运行session。(2)...将工作委派给可用的工作人员以计算ses

python - 从 celery 任务中获取 celery worker 的名字?

我希望celery任务能够获取执行它的工作人员的姓名,以便进行日志记录。我需要在任务中处理这个问题,而不是直接查询代理。有没有办法做到这一点?如果重要的话,我正在将celery与RabbitMQ一起使用。 最佳答案 使用celeryd_after_setup信号像这样捕获worker名称:fromcelery.signalsimportceleryd_after_setup@celeryd_after_setup.connectdefcapture_worker_name(sender,instance,**kwargs):os.e

Python multiprocessing——跟踪pool.map操作的过程

我有一个函数可以执行一些模拟和返回字符串格式的数组。我想运行模拟(功能)不同的输入参数值,超过10000个可能的输入值,并将结果写入单个文件。我正在使用多处理,特别是pool.map函数并行运行模拟。全程模拟功能运行超过10000次需要很长时间,我很想跟踪整个操作过程。我认为下面当前代码中的问题是,pool.map运行函数10000次,在这些操作期间没有任何进程跟踪。一旦并行处理完成运行10000次模拟(可能是几小时到几天),然后我会继续跟踪10000次模拟结果何时被保存到文件中。所以这并不是真正跟踪pool.map操作的处理。我的代码是否有允许进程跟踪的简单修复?defsimFunc

python - concurrent.futures.ProcessPoolExecutor 与 multiprocessing.pool.Pool

这个问题在这里已经有了答案:Concurrent.futuresvsMultiprocessinginPython3(6个答案)关闭5年前。请给我解释一下这两个类有什么区别?concurrent.futures.ProcessPoolExecutormultiprocessing.pool.Pool我注意到Python2中存在multiprocessing模块。但是功能上呢?

python - 利用 "Copy-on-Write"将数据复制到 Multiprocessing.Pool() 工作进程

我有一些multiprocessingPython代码,看起来有点像这样:importtimefrommultiprocessingimportPoolimportnumpyasnpclassMyClass(object):def__init__(self):self.myAttribute=np.zeros(100000000)#basicallyabigmemorystructdefmy_multithreaded_analysis(self):arg_lists=[(self,i)foriinrange(10)]pool=Pool(processes=10)result=pool

worker 中的 Python 多处理和处理异常

我将python多处理库用于一种算法,其中我有许多工作人员处理某些数据并将结果返回给父进程。我使用multiprocessing.Queue将作业传递给工作人员,然后收集结果。一切都很好,直到worker无法处理某些数据block。在下面的简化示例中,每个工作人员都有两个阶段:初始化-可能会失败,在这种情况下worker应该被销毁数据处理——处理一block数据可能会失败,在这种情况下,worker应该跳过这个block并继续处理下一个数据。当这两个阶段中的任何一个失败时,我都会在脚本完成后陷入僵局。此代码模拟了我的问题:importmultiprocessingasmpimportr

python - 无法启动 Airflow worker/flower,需要澄清 Airflow 架构以确认安装正确

在不同的机器上运行worker会导致下面指定的错误。我关注了theconfigurationinstructions并同步dags文件夹。我还要确认一下,RabbitMQ和PostgreSQL只需要安装在Airflow核心机上,不需要安装在worker上(worker只连接core)。设置规范详述如下:Airflow核心/服务器计算机已安装以下内容:Python2.7与Airflow(AIRFLOW_HOME=~/airflow)celery心理治疗师2RabbitMQPostgreSQLairflow.cfg中的配置:sql_alchemy_conn=postgresql+psyco

python - Celery Worker 中的多线程

我正在使用Celery和RabbitMQ来处理来自API请求的数据。流程如下:请求>API>RabbitMQ>CeleryWorker>返回理想情况下,我会产生更多的celeryworker,但我受限于内存限制。目前,我的流程中的瓶颈是从传递给工作人员的URL中获取和下载数据。粗略的,流程大概是这样的:defcelery_gets_job(url):data=fetches_url(url)#takes0.1sto1.0s(bottleneck)result=processes_data(data)#takes0.1sreturnresult这是NotAcceptable,因为工作人员

python - 调用 celery add_consumer 后 worker 不消费任务

我想利用Celery(使用RabbitMQ作为后端MQ)通过不同的队列执行不同风格的任务。一个要求是来自特定队列的(由工作人员)消费应该具有暂停和恢复的能力。celery,好像有thiscapability通过调用add_consumer和cancel_consumer。虽然我能够取消特定工作人员队列中任务的消费,但我无法通过调用add_consumer让工作人员恢复消费。Thecodetoreproducethisissueisprovidedhere.我的猜测可能是我缺少某种在celeryconfig中或在启动worker时通过参数提供的参数?如果能对此有一些新的看法,那就太好了。

python - 如何布局队列/ worker 结构以支持多种环境的大型任务?

对于基于Python/Django/Celery的部署工具,我们有以下设置:我们目前使用默认的Celery设置。(一个队列+交换称为“celery”。)队列中的每个任务代表一个部署操作。环境的每项任务都以可能需要(非常)长的同步阶段结束。需要满足以下规范:并发性:多个环境的任务应该同时执行。锁定:可能至多每个环境同时运行任务(即环境锁定)。吞吐量优化:当单个环境有多个任务时,可以将它们的同步阶段合并起来进行优化。因此,如果任务接近尾声,它应该检查队列中是否有新任务等待此环境,如果有,则跳过其同步阶段。实现它的首选方法是什么?一些想法:我会说我们必须设置多个队列:每个环境一个,并让N个c