草庐IT

CELERY_ALWAYS_EAGER

全部标签

python - Celery:对具有相同参数的任务进行速率限制

我正在寻找一种方法来限制何时调用函数,但仅限于输入参数不同时,即:@app.task(rate_limit="60/s")defapi_call(user):do_the_api_call()foriinrange(0,100):api_call("antoine")api_call("oscar")所以我希望api_call("antoine")每秒调用60次和api_call("oscar")每秒60次。关于我该怎么做的任何帮助?--编辑27/04/2015我曾尝试在任务中使用rate_limit调用子任务,但它也不起作用:rate_limit始终应用于所有实例化的子任务或任务(这

python - celery + Django 错误 : TypeError: 'Settings' object is not subscriptable

我遇到了这个问题,但我完全不知道它是从哪里来的。我使用的代码与theCelerytutorial中提供的代码完全相同除了我将项目目录也用作我唯一的应用程序目录这一事实(因此,拥有models.py、views.py并将“proj”添加到INSTALLED_APPS)。奇怪的是,在我的本地机器上尝试这个设置时,错误总是出现在每个django命令(迁移、shell...)中。删除virtualenv并重新安装misteriously修复了它。现在,将应用程序推送到Heroku时,我遇到了同样的问题,无法正常工作。有什么提示吗?谢谢!ps:Python3.4.2,Django1.8.4,Ce

python - 使用本地计算机作为主机将 EC2 实例设置为 Celery Worker

类似于我的问题here我正在尝试设置多个亚马逊EC2实例来进行一些多处理。我正在考虑使用celery来管理worker。有没有人使用本地计算机作为主机在EC2实例上使用celery?有没有人有任何好的建议、教程、意见等可能有帮助?我用celery在django中做了一些简单的异步过程,但没有达到这种规模(工作人员和主机在同一台机器上)。而且大部分处理都是“基于文件的”(即读写文件)......你认为用celery腌制和传输文件的内容会更好吗(大多数文件是1-2kb的文本)或跨EC2实例镜像文件系统,然后让工作人员返回结果(通常是0.5kb的文本)。 最佳答案

python - 动态添加/删除线程到 celery 中的工作池

如何从任务中向当前多处理池添加更多线程(并删除线程)(即celeryd运行时CELERYD_CONCURRENCY=10,但我想即时将其更改为CELERYD_CONCURRENCY=15)?有一个函数叫做celery.concurrency.processes.TaskPool.Pool.grow但我不知道如何从正在运行的任务中调用它,也不知道它是否是执行该操作的正确函数。 最佳答案 阅读来源:https://github.com/ask/celery/blob/master/celery/concurrency/processes

python - 在不暂停服务的情况下将新代码部署到生产 celery 集群中的可靠方法

我有几个celery节点在生产中使用rabbitmq运行,我一直在处理服务中断的部署。我必须关闭整个站点才能将新代码部署到celery。我将每个child的最大任务设置为1,所以理论上,如果我对现有任务进行更改,它们应该在下次运行时生效,但是注册新任务呢?我知道重启守护进程不会杀死正在运行的worker,而是会让他们自己死掉,但这看起来仍然很危险。这个问题有优雅的解决方案吗? 最佳答案 这里具有挑战性的部分似乎是确定哪些celery任务是新的,哪些是旧的。我建议在rabbitmq中创建另一个虚拟主机并执行以下步骤:使用新代码更新Dj

python - celery 节拍队列包括过时的任务

我在Django中使用周期性的celery任务。我曾经在我的app/tasks.py文件中有以下任务:@periodic_task(run_every=timedelta(minutes=2))defstuff():...但是现在这个任务已经从我的app/tasks.py文件中删除了。但是,我一直在celery日志中看到对这个任务的调用:[2013-05-2107:08:37,963:ERROR/MainProcess]Receivedunregisteredtaskoftypeu'app.tasks.stuff'.我使用的celerybeatscheduler似乎没有更新它的队列。这

python - 为多个 celery worker 和线程正确设置 Flask-SQLAlchemy

当有多个celeryworker涉及多个线程并且都查询同一个数据库时,我正在努力使我的Flask、SQLAlchemy(mysql)和Celery设置正常工作。问题是我无法弄清楚如何以及在何处应用所需的更改,这些更改为flask应用程序和每个celeryworker提供了一个独立的数据库对象。据我所知,需要单独的session来避免讨厌的数据库错误,例如阻止其他数据库查询的不完整事务。这是我目前的项目结构/flask_celery.pyfromceleryimportCelerydefmake_celery(app):celery=Celery(app.import_name,back

python - 如何使用 Celery 延迟任务?

不是说delay方法。我希望能够获得一个任务,给定它的task_id并在执行之前即时更改它的ETA。现在我必须取消它,并重新安排一个。如果计划的过程涉及很多东西,那就麻烦了。 最佳答案 您应该在celery/任务队列之外存储一些“暂停”值。我用一个使用celery的postman来做这件事。我可以通过在memcache或mysql中设置值来暂停部分系统。然后任务确保在执行任务之前查询外部资源。如果它打算暂停,它会设置它执行task.retry()导致它经历重试延迟时间等。 关于pytho

python - Celery 的类似信号量的机制

我们正在为我们的任务队列开发一个使用Python+Celery的分布式应用程序。我们的应用程序要求我们通过IMAP(例如:gmail)从远程ISP下载电子邮件,我们希望能够并行完成此任务。对于给定的电子邮件帐户,您被授予有限数量的模拟连接,因此我们需要一种方法来自动跟踪所有正在下载的帐户的事件连接。我已经找到了多个使用Redis的Celery原子锁示例,但是没有一个可以跟踪像这样的有限资源池,并且所有实现我们自己的尝试都导致难以调试竞争条件,导致我们的锁间歇性地永远不会被释放。 最佳答案 由于celery使用multiprocess

python - 为每个 celery worker 创建单独的数据库连接

当工作人员在创建后执行任务时,我不断遇到奇怪的mysql问题。我们使用django1.3、celery3.1.17、djorm-ext-pool0.5我们以并发3启动celery进程。到目前为止,我的观察是,当工作进程启动时,它们都获得相同的mysql连接。我们记录数据库连接ID,如下所示。fromdjango.dbimportconnectionconnection.cursor()logger.info("Task%sprocessingwithdbconnection%s",str(task_id),str(connection.connection.thread_id()))当