草庐IT

python - celery 节拍队列包括过时的任务

我在Django中使用周期性的celery任务。我曾经在我的app/tasks.py文件中有以下任务:@periodic_task(run_every=timedelta(minutes=2))defstuff():...但是现在这个任务已经从我的app/tasks.py文件中删除了。但是,我一直在celery日志中看到对这个任务的调用:[2013-05-2107:08:37,963:ERROR/MainProcess]Receivedunregisteredtaskoftypeu'app.tasks.stuff'.我使用的celerybeatscheduler似乎没有更新它的队列。这

python - 为多个 celery worker 和线程正确设置 Flask-SQLAlchemy

当有多个celeryworker涉及多个线程并且都查询同一个数据库时,我正在努力使我的Flask、SQLAlchemy(mysql)和Celery设置正常工作。问题是我无法弄清楚如何以及在何处应用所需的更改,这些更改为flask应用程序和每个celeryworker提供了一个独立的数据库对象。据我所知,需要单独的session来避免讨厌的数据库错误,例如阻止其他数据库查询的不完整事务。这是我目前的项目结构/flask_celery.pyfromceleryimportCelerydefmake_celery(app):celery=Celery(app.import_name,back

python - 如何使用 Celery 延迟任务?

不是说delay方法。我希望能够获得一个任务,给定它的task_id并在执行之前即时更改它的ETA。现在我必须取消它,并重新安排一个。如果计划的过程涉及很多东西,那就麻烦了。 最佳答案 您应该在celery/任务队列之外存储一些“暂停”值。我用一个使用celery的postman来做这件事。我可以通过在memcache或mysql中设置值来暂停部分系统。然后任务确保在执行任务之前查询外部资源。如果它打算暂停,它会设置它执行task.retry()导致它经历重试延迟时间等。 关于pytho

python - Celery 的类似信号量的机制

我们正在为我们的任务队列开发一个使用Python+Celery的分布式应用程序。我们的应用程序要求我们通过IMAP(例如:gmail)从远程ISP下载电子邮件,我们希望能够并行完成此任务。对于给定的电子邮件帐户,您被授予有限数量的模拟连接,因此我们需要一种方法来自动跟踪所有正在下载的帐户的事件连接。我已经找到了多个使用Redis的Celery原子锁示例,但是没有一个可以跟踪像这样的有限资源池,并且所有实现我们自己的尝试都导致难以调试竞争条件,导致我们的锁间歇性地永远不会被释放。 最佳答案 由于celery使用multiprocess

python - 为每个 celery worker 创建单独的数据库连接

当工作人员在创建后执行任务时,我不断遇到奇怪的mysql问题。我们使用django1.3、celery3.1.17、djorm-ext-pool0.5我们以并发3启动celery进程。到目前为止,我的观察是,当工作进程启动时,它们都获得相同的mysql连接。我们记录数据库连接ID,如下所示。fromdjango.dbimportconnectionconnection.cursor()logger.info("Task%sprocessingwithdbconnection%s",str(task_id),str(connection.connection.thread_id()))当

python - 集成测试多个 Celery Worker 和一个数据库支持的 Django API

我正在使用一个面向软件的架构,它有多个celeryworker(我们称它们为worker1、worker2和worker3)。所有三个工作人员都是独立的实体(即,独立的代码库、独立的repos、独立的celery实例、独立的机器),它们都没有连接到Django应用程序。通过基于Django、MySQL支持的RESTfulAPI与这三个工作人员中的每一个进行通信。在开发中,这些服务都在一个vagrantbox上,每一个都作为一个独立的机器运行在一个单独的端口上。我们有一个RabbitMQ代理来处理所有Celery任务。通过这些服务的典型路径可能如下所示:worker1从设备获取消息,进行

python - Celery/RabbitMQ unacked 消息阻塞队列?

我已经调用了一个使用urllib2远程获取一些信息的任务数千次。这些任务是用随机eta安排的(一周内),因此它们不会同时访问服务器。有时我会收到404,有时不会。我正在处理错误以防万一。在RabbitMQ控制台中,我可以看到16条未确认的消息:我停止了celery,清除了队列并重新启动了它。16条未确认的消息仍然存在。我还有其他任务进入同一个队列,但也没有执行。清除后,我尝试提交另一个任务,它的状态仍然是就绪:有什么办法可以找出消息未被确认的原因吗?版本:celery==3.1.4{rabbit,"RabbitMQ","3.5.3"}celeryapp.pyCELERYBEAT_SCH

python - 如何从 Celery worker 捕获自定义异常,或阻止它以 `celery.backends.base` 为前缀?

我的Celery任务引发自定义异常NonTransientProcessingError,然后被AsyncResult.get()捕获.任务.py:classNonTransientProcessingError(Exception):pass@shared_task()defthrow_exception():raiseNonTransientProcessingError('ErrorraisedbyPOCmodelfortestpurposes')在Python控制台中:frommy_app.tasksimport*r=throw_exception.apply_async()t

python - Flask with Celery - 应用程序上下文不可用

我有一个Flask应用程序,注册如下:APP=Flask(__name__)APP.config.from_object('config')我已经为URL定义了一个View,其中调用了一个与数据库交互的函数。fromtasksimportsome_func...some_func.delay(params)在tasks.py文件中,我正在创建一个Celery实例,如下所示:#Configvaluesdon'tworkherecelery=Celery('tasks',broker='amqp://',backend='amqp://')...@celery.task()defsome_

python - 如何解决这个错误? django+celery+rabbitmq+mysql+redis中的 "RestartFreqExceeded: 5 in 1s"

所以我将django与celery一起使用。rabbitmq是经纪人。redis是缓存。mysql是数据库。(本地主机中的所有内容)我正在使用python2.7并使用基于virtualenv的虚拟环境我在默认端口启动redis服务器(本地)在一个新的终端中,我运行pythonmanage.pyrunserver在一个新终端中,我像这样启动celerycelery-Ads_djangoworker-B-lwarning这个程序曾经有效。但是现在当我运行celery命令时,我得到以下内容[2016-07-1209:15:20,113:CRITICAL/MainProcess]Frequen