草庐IT

mjob_worker

全部标签

python - 如何从 Celery worker 捕获自定义异常,或阻止它以 `celery.backends.base` 为前缀?

我的Celery任务引发自定义异常NonTransientProcessingError,然后被AsyncResult.get()捕获.任务.py:classNonTransientProcessingError(Exception):pass@shared_task()defthrow_exception():raiseNonTransientProcessingError('ErrorraisedbyPOCmodelfortestpurposes')在Python控制台中:frommy_app.tasksimport*r=throw_exception.apply_async()t

python - celery 可以分配任务以指定 worker

Celery会发送任务给空闲的worker。我有一个任务每5秒运行一次,我希望这个任务只发送给一个指定的工作人员。其他任务可以共享剩下的workercelery可以这样做吗??我想知道这个参数是什么:CELERY_TASK_RESULT_EXPIRES是不是表示任务不会发给队列中的某个worker?或者如果运行时间太长它会停止任务吗? 最佳答案 当然可以。最好的方法是使用不同的队列将celeryworker分开。您只需要确保您需要的任务进入单独的队列,并且您的工作人员正在监听特定的队列。长话短说:http://docs.celery

python - 在一台机器上的 celery worker 之间共享内存区域

我想在celery中的工作节点之间共享小块信息(例如缓存的授权token、统计信息等)。如果我在我的任务文件中创建一个全局变量,它对每个工作人员都是唯一的(我的工作人员是进程,并且有1个任务/执行的生命周期)。最佳做法是什么?我是否应该在外部保存状态(DB),创建一个老式的共享内存(由于celery中不同的池实现可能很困难)?提前致谢! 最佳答案 我终于找到了一个像样的解决方案——corepythonmultiprocessing-Manager:frommultiprocessingimportManagermanag=Manag

python - PyTorch 数据加载器中的 "number of workers"参数实际上是如何工作的?

如果num_workers为2,这是否意味着它会将2个批处理放入RAM并将其中的1个发送到GPU还是将3个批处理放入RAM然后将其中的1个发送到GPU?当worker数量高于CPU核心数量时,实际会发生什么情况?我试过了,效果很好,但它是如何工作的?(我以为我可以选择的最大worker数量是核心数)。如果我将num_workers设置为3,并且在训练期间GPU的内存中没有批处理,主进程是等待其工作人员读取批处理还是读取单个批处理(无需等待worker)? 最佳答案 当num_workers>0时,只有这些worker会检索数据,主进

python - celery worker 在调用 retry() 后不重试任务

我有一个任务:@celery.task(name='request_task',default_retry_delay=2,acks_late=True)defrequest_task(data):try:ifsome_condition:request_task.retry()exceptExceptionase:request_task.retry()我使用带有mongodb代理和mongodb结果后端的celery。当调用任务的retry()方法时,无论是从条件语句还是在捕获异常之后,任务都不会重试。在工作人员的终端中,我收到这样的消息:[2012-08-1019:21:54,9

python - 运行连接到 Django 测试数据库的 Celery worker

背景:我正在开展一个项目,该项目使用Celery来安排将在未来特定时间运行的任务。这些任务插入最终状态机的状态向前发展。这是一个例子:计划在2天内向用户发送future提醒。当该计划任务运行时,会发送一封电子邮件,并且FSM会前进到下一个状态下一个状态是安排提醒在另外两天运行当这个任务运行时,它会发送另一封邮件,高级状态等等...我目前正在按照thisSOanswer的建议使用CELERY_ALWAYS_EAGER在测试中使用该技术的问题在于,本应在单独线程中运行的任务代码与调度它的线程在同一线程中运行。这会导致FSM状态无法正确保存,并且难以测试。我无法确定究竟是什么导致了它,但似乎

Python Celery - worker 忽略日志级别信息

我正在使用以下命令:celeryworker-linfo-Adjango_app--concurrency=10--autoreload但是DEBUG日志还是不断涌出,使用-lwarning和--logfile也是如此知道为什么Celery会忽略日志设置吗?更多详情:日志来自Python库suds,它使用DEBUG输出到记录器。 最佳答案 我遇到了同样的问题,我决定在settings.py中调整日志级别:LOGGING['loggers']['celery']={'handlers':['console',],'level':,'p

python - gunicorn gevent worker 修补了哪些功能?

我正在使用gunicorn运行一些flask和django应用程序,使用geventworker,有一些问题...首先,我假设因为gunicornfork并实例化我的进程,它会猴子修补标准模块,我不必自己调用monkey.patch_all,它已经为我完成,并且每个请求都作为greenlet运行,对吗?其次,这是重要的部分,gunicorn-gevent真的给猴子修补了哪些功能?当您使用gevent时,您始终可以选择要修补的功能(套接字、补丁、urllib)...所以,问题是,这些特性中的哪些是真的得到了gunicorn-gevent的猴子补丁?我如何更改此列表?例如,对monkey.

python - 不断从 celery worker 那里检索结果

我有一个web应用程序,我试图在其中使用celery从数据库加载后台任务。我目前正在根据要求加载数据库,但希望以每小时为间隔加载任务并让它们在后台工作。我正在使用flask并在python中编码。我也在运行redis。到目前为止,我已经使用celery让工作人员处理任务,并让节拍定期将任务发送给工作人员。但是我想从工作人员那里检索结果[一个数据框或查询],如果结果还没有准备好,那么它应该加载工作人员以前的结果。关于如何做到这一点有什么想法吗?编辑我正在使用sqlalchemy从数据库中检索结果,并在网页中呈现结果。我的主页上有各种链接,这些链接都指向不同的图表,我想在后台加载这些图表,

python - 一旦其中一个 worker 满足特定条件,就终止 Python 多处理程序

我正在使用其多处理模块编写Python程序。该程序调用许多辅助函数,每个辅助函数产生一个随机数。一旦其中一名worker产生了大于0.7的数字,我就需要终止程序。下面是我的程序,其中“如何做”部分尚未填写。任何想法?谢谢。importtimeimportnumpyasnpimportmultiprocessingasmpimporttimeimportsysdeff(i):np.random.seed(int(time.time()+i))time.sleep(3)res=np.random.rand()print"Fromi=",i,"res=",resifres>0.7:print