草庐IT

轻松完成异步任务,一文搞懂Python Celery

学研君 2023-03-31 原文

虽然现代的网络应用比以往任何时候都更快速、更便捷,但仍有许多情况下,需要把繁重的任务转移到系统的其他部分执行,而不是在主线程上进行工作。

这些情况中的示例如下:

  • 周期性任务 —— 计划在特定时间间隔内运行的工作。例如,每日、每月的报告生成。
  • 第三方工具 —— 应用程序应该快速向用户返回响应,而不是等待其他任务先完成。例如,发送电子邮件、通知,将更新进度传递给内部工具。
  • 长时间运行的工作 —— 执行复杂或资源昂贵的工作,并且用户需要等待工作完成。例如。DAG工作流、基于Map-Reduce的任务、长时间运行的Spark作业等。

那么,如何处理这些情况呢?这时,Celery就派上用场了。

什么是Celery?

Celery是一个开源的任务队列实现,通常与基于Python的网络框架(如Flask和Django)相结合,在典型的请求-响应周期之外异步执行任务。

因此,Celery本质上是一个基于分布式消息传递的任务队列。执行单元或任务在一个或多个worker上使用多处理、gevent或Eventlet同时执行。这些任务可以同步执行(即等到准备就绪)或异步执行(即在后台)。

Celery是如何工作的?

Celery是一个分布式任务队列,基于生产者-消费者模式。

任务队列是用于跨线程和机器分配工作的机制,本质上是生产者(Web应用程序)和消费者(Celery工作者)之间的消息中介。

Celery通过消息进行交互,代理(broker)在客户(生产者)和工作者(消费者)之间充当中间人。为了启动任务,客户端将消息推送到队列中,然后代理将该消息传递给工作者。

Celery系统可以由多个worker和broker组成,这为高可用性和横向扩展提供了可能。

简而言之,Celery客户端是生产者,它通过消息代理向队列中添加新的任务。然后,Celery工作者同样通过消息代理从队列中获取新的任务。一旦处理完毕,结果就会存储在结果后端。

工作实例

下面的例子将使用RedisMQ作为消息代理。

设置Redis

在linux/macOS系统上,通过以下命令在本地运行Redis服务器:

$ wget http://download.redis.io/redis-stable.tar.gz
$ tar xvzf redis-stable.tar.gz
$ rm redis-stable.tar.gz
$ cd redis-stable
$ make

设置好Redis后,通过执行以下命令运行Redis服务器:

$ redis-server

该服务器在默认的6379端口运行。

设置应用程序

首先,在本地设置Python项目。

Celery可以通过标准工具如pip或easy_install来安装。通过以下命令安装Celery和Redis:

pip install celery redis==4.3.4

现在需要一个Celery实例来运行应用程序,Celery实现任何任务都是以实例开始,比如创建和管理任务等。

在项目中创建一个文件tasks.py:

From celery import Celery

broker_url = 'redi://localhost:6379/0'

app = Celery('tasks',broker = broker_url)

@app.task
def add(x, y):
return x+y

这里定义了一个简单的任务add(),返回两个数字的总和。

运行Celery Worker

在终端上,切换到项目位置并用以下命令运行Celery worker:

$ celery -A tasks worker - loglevel=info

关于Celery worker命令行的详细信息,可以使用help:

$ celery worker - help

调用任务

在Celery中,使用delay()方法来调用任务。

打开项目的另一个终端窗口并运行以下命令:

$ python

这将打开Python命令行。

>> from tasks import add
>> add.delay(1,2)

这将返回一个AsyncResult实例,可以用来检查任务状态,获得其返回值,等待任务完成,也可以在失败时获得异常和回溯。

运行add.delay()命令后,任务会被推送到队列中,然后被worker获取。这可以在Celery worker终端上进行验证,可以清楚地看到任务被接收,之后任务成功完成。

有关轻松完成异步任务,一文搞懂Python Celery的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  3. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  4. 屏幕录制为什么没声音?检查这2项,轻松解决 - 2

    相信很多人在录制视频的时候都会遇到各种各样的问题,比如录制的视频没有声音。屏幕录制为什么没声音?今天小编就和大家分享一下如何录制音画同步视频的具体操作方法。如果你有录制的视频没有声音,你可以试试这个方法。 一、检查是否打开电脑系统声音相信很多小伙伴在录制视频后会发现录制的视频没有声音,屏幕录制为什么没声音?如果当时没有打开音频录制,则录制好的视频是没有声音的。因此,建议在录制前进行检查。屏幕上没有声音,很可能是因为你的电脑系统的声音被禁止了。您只需打开电脑系统的声音,即可录制音频和图画同步视频。操作方法:步骤1:点击电脑屏幕右下侧的“小喇叭”图案,在上方的选项中,选择“声音”。 步骤2:在“声

  5. ruby-on-rails - Rake 任务仅调用一次时执行两次 - 2

    我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里

  6. ruby-on-rails - 在 Ruby on Rails 中发送响应之前如何等待多个异步操作完成? - 2

    在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.

  7. ruby - 帮助使用 Ruby 中的 "Whenever"gem 来执行 cron 任务 - 2

    我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl

  8. ruby - 在 rake 任务中运行 capybara - 2

    如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)

  9. Ruby:行 "m = Hash.new {|h,k| h[k] = []}"完成了什么而 "Hash.new"没有完成? - 2

    一边学习thisRailscast我从Rack中看到了以下源代码:defself.middleware@middleware||=beginm=Hash.new{|h,k|h[k]=[]}m["deployment"].concat[[Rack::ContentLength],[Rack::Chunked],logging_middleware]m["development"].concatm["deployment"]+[[Rack::ShowExceptions],[Rack::Lint]]mendend我的问题是关于第三行。什么是传递block{|h,k|h[k]=[]}到Has

  10. ruby - 在 Rakefile 中动态生成 Rake 测试任务(基于现有的测试文件) - 2

    我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n

随机推荐