草庐IT

worker-pool

全部标签

python - Django celery : Passing request Object to worker

我如何将django请求对象传递给celeryworker。当尝试传递请求对象时,它会抛出一个错误Can'tPickleInputObjects似乎celery序列化了传递给worker的所有参数。我尝试使用其他序列化方法,如JSON。CELERY_TASK_SERIALIZER="JSON"但它不起作用。是否可以配置celery使其不序列化数据。或者我可以在传递给工作人员之前将请求对象转换为字符串,然后再次转换回工作人员中的对象。提前致谢... 最佳答案 你不能pickleDjango的请求对象(更多细节见thisquestion

python - 如何使用同一个 worker 重试 celery ?

我刚刚开始在Django项目中使用celery,并且有点陷入这个特定问题:基本上,我需要将一个长时间运行的任务分配给不同的工作人员。该任务实际上分为几个步骤,每个步骤都需要相当长的时间才能完成。因此,如果某个步骤失败,我希望celery使用同一个worker重试此任务,以重用已完成步骤的结果。我知道celery使用路由将任务分发到某个服务器,但我找不到关于这个特定问题的任何信息。我使用RabbitMQ作为我的代理。 最佳答案 您可以让每个celeryd实例从以工作人员的主机名命名的队列中消费:celeryd-linfo-nworke

python - celery worker 挂起没有任何错误

我有一个运行celeryworker的生产设置,用于向远程服务发出POST/GET请求并存储结果,它每15分钟处理大约20k个任务。问题是worker无缘无故地NumPy,没有错误,没有警告。我也尝试添加多处理,结果相同。在日志中我看到执行任务的时间在增加,就像在s中成功有关更多详细信息,请参阅https://github.com/celery/celery/issues/2621 最佳答案 如果您的celeryworker有时卡住,您可以使用strace&lsof找出它卡在哪个系统调用处。例如:$strace-p10268-s10

python multiprocessing.Pool kill *特定*长时间运行或挂起的进程

我需要执行一个包含许多并行数据库连接和查询的池。我想使用multiprocessing.Pool或concurrent.futuresProcessPoolExecutor。Python2.7.5在某些情况下,查询请求花费的时间太长或永远不会完成(挂起/僵尸进程)。我想从超时的multiprocessing.Pool或concurrent.futuresProcessPoolExecutor中终止特定进程。这是一个如何终止/重新生成整个进程池的示例,但理想情况下我会尽量减少CPU抖动,因为我只想终止在超时秒后未返回数据的特定长时间运行的进程。由于某些原因,在返回并完成所有结果后,下面的

python - 我应该每次都创建一个新的 Pool 对象还是重复使用一个?

我正在尝试了解Python的multiprocessing.Pool对象的最佳实践。在我的程序中,我经常使用Pool.imap。通常,每次我并行启动任务时,我都会创建一个新的池对象,然后在完成后将其关闭。我最近遇到了提交到池中的任务数少于进程数的挂起。奇怪的是,它只发生在我的测试管道中,它之前运行了很多东西。单独运行测试并没有导致手牌。我认为这与制作多个池有关。我真的很想找到一些资源来帮助我了解使用Python多处理的最佳实践。具体来说,我目前正在尝试了解制作多个池对象与仅使用一个池对象的含义。 最佳答案 当您创建一个工作进程池时,

python - 在 Gunicorn workers 之间共享一个对象,或者在一个 worker 中持久化一个对象

我正在使用Nginx/Gunicorn/Bottle堆栈编写一个WSGI应用程序,它接受GET请求,返回一个简单的响应,然后将消息写入RabbitMQ。如果我直接通过Bottle运行应用程序,那么每次应用程序收到GET时我都会重新使用RabbitMQ连接。但是,在Gunicorn中,看起来工作人员每次都在破坏和重新创建MQ连接。我想知道是否有重用该连接的好方法。更详细的信息:##ThisismybottleappfrombottleimportblahblahblahimportbottlefrommqconnectorimportMQConnectormqc=MQConnector(

python - `context` 中的 `multiprocessing.pool.Pool` 参数是什么意思?

context是classmultiprocessing.pool.Pool构造函数中的可选参数。Documentation只说:contextcanbeusedtospecifythecontextusedforstartingtheworkerprocesses.Usuallyapooliscreatedusingthefunctionmultiprocessing.Pool()orthePool()methodofacontextobject.Inbothcasescontextissetappropriately.它没有阐明什么是“上下文对象”,为什么classPool构造函数

python - 如何杀死多进程中的所有 Pool worker?

我想停止一个worker的所有线程。我有一个有10个worker的线程池:defmyfunction(i):print(i)if(i==20):sys.exit()p=multiprocessing.Pool(10,init_worker)foriinrange(100):p.apply_async(myfunction,(i,))我的程序不会停止,其他进程会继续工作,直到完成所有100次迭代。我想从调用sys.exit()的线程内部完全停止池。目前的编写方式只会停止调用sys.exit()的worker。 最佳答案 这没有按您预期

Python 多处理 pool.map 引发 IndexError

我已经使用python/cython开发了一个实用程序,可以对CSV文件进行排序并为客户端生成统计信息,但是调用pool.map似乎会在我的映射函数有机会执行之前引发异常。对少量文件进行排序似乎按预期运行,但随着文件数量增加到10,我在调用pool.map后得到以下IndexError。有没有人碰巧认识到以下错误?非常感谢任何帮助。虽然代码处于NDA之下,但用例非常简单:代码示例:defsort_files(csv_files):pool_size=multiprocessing.cpu_count()pool=multiprocessing.Pool(processes=pool_s

python - Gunicorn 在多处理进程和 worker 之间共享内存

我有一个python应用程序,它使用字典作为多个进程之间的共享内存:frommultiprocessingimportManagermanager=Manager()shared_dict=manager.dict()RESTAPI是使用Flask实现的。在使用pywsgi或简单地Flask.run初始化Flask服务器时,一切正常。我决定加入混合gunicorn。现在,当我从任何工作人员访问这个共享字典时(即使只有一个工作人员正在运行)我得到错误:message=connection.recv_bytes(256)#rejectlargemessageIOError:[Errno35