上篇文章我们学习了Flask框架——MongoEngine使用MongoDB数据库,这篇文章我们学习Flask框架——基于Celery的后台任务。
在Web开发中,我们经常会遇到一些耗时的操作,例如:上传/下载数据、发送邮件/短信,执行各种任务等等。这时我们可以使用分布式异步消息任务队列去执行这些任务。
Celery是一款非常简单、灵活、可靠的分布式异步消息队列工具,可以用于处理大量消息、实时数据以及任务调度。
Celery通过消息机制进行通信,一般使用中间人(Broker)作为客户端和职程(Worker)调节。
其工作流程如下图所示:
客户端发送消息任务给中间人(Broker),任务执行单元(Celery Worker)监控中间人中的任务队列,当中间人有消息任务时就分配任务给任务执行单元,任务执行单元在后台运行任务并返回请求。
注意:Celery可以有多个职程(Worker)和中间人(Broker),用来提高Celery的高可用性以及横向扩展能力。
Celery优点:
Celery安装方式很简单,执行如下命令即可:
pip install celery
这里我们使用redis作为中间人,执行如下代码安装redis:
pip install redis
(1)不使用Celery执行耗时任务,创建一个名为test.py文件,其示例代码如下:
import time
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('开始执行')
result=add(2,3) #调用add函数
print('执行结束')
print(result)
运行test.py文件,运行结果如下图:
(2)使用Celery执行耗时任务,创建一个名为tasks.py文件,示例代码如下:
import time
from celery import Celery
celery = Celery( #实例化Celery对象
'tasks', #当前模块名
broker='redis://localhost:6379/1', #使用redis为中间人
backend='redis://localhost:6379/2' #结果存储
)
@celery.task() #使用异步任务装饰器task
def add(a,b):
time.sleep(5) #休眠5秒
return a+b
if __name__ == '__main__':
print('开始执行')
result=add.delay(2,3) #调用add方法并使用delay延时函数
print('执行结束')
print(result)
实例化Celery对象,其中第一个参数为当前模块名,第二个参数为中间人(Broker)的URL链接,第三个参数为中间人结果放回的存储URL链接,再调用add()方法时,需要使用delay延时函数。
运行tasks.py文件,运行结果如下图所示:
当我们运行tasks.py文件时,发现程序一下子就运行结束并返回任务id,
在终端执行如下代码运行Celery职程(Worker)服务:
celery -A tasks worker -l info
如下图所示:
虽然职程已经收到任务并且在分配到子进程运行了,但是发现该任务没有运行结束,这时因为Celery不支持在windows下运行任务,需要借助eventlet来完成,执行如下安装eventlet:
pip install eventlet
安装成功后,执行如下代码运行Celery职程(Worker)服务:
celery -A tasks worker -l info -P eventlet -c 10
运行结果如下:
大多数情况下,使用默认的配置即可满足我们的开发,不需要修改配置,当我们需要修改配置时,可以通过update进行配置,在上面的tasks.py添加如下代码:
celery.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Oslo',
enable_utc=True,
)
其中:
accept_content:允许的内容类型/序列化程序的白名单,如果收到不在此列表中的消息,则该消息将被丢弃并出现错误,默认只为json;
task_serializer:标识要使用的默认序列化方法的字符串,默认值为json;
result_serializer:结果序列化格式,默认值为json;
timezone:配置Celery以使用自定义时区;
enable_utc:启用消息中的日期和时间,将转换为使用 UTC 时区,与timezone连用,当设置为 false 时,将使用系统本地时区。
除了上面的配置参数,Celery还提供了很多很多配置参数,大家可以在官方配置文档中查看
Celery的配置信息比较多,通常情况下,我们会在tasks.py同级目录下为创建Celery的配置文件, 这里命名为celeryconfig.py,示例代码如下:
broker_url = 'redis://localhost:6379/1'
result_backend = 'redis://localhost:6379/2'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True
执行如下代码加载配置:
import celeryconfig
app.config_from_object('celeryconfig')
首先创建一个名为mycelery.py文件,该文件用来实例化Celery对象,示例代码如下:
from celery import Celery
def make_celery(app):
celery = Celery( #实例化Celery
'tasks',
broker='redis://localhost:6379/1', #使用redis为中间人
backend='redis://localhost:6379/2' #结果存储
)
class ContextTask(celery.Task): #创建ContextTask类并继承Celery.Task子类
def __call__(self, *args, **kwargs):
with app.app_context(): #和Flask中的app建立关系
return self.run(*args, **kwargs) #返回任务
celery.Task = ContextTask #异步任务实例化ContextTask
return celery #返回celery对象
首先自定义一个名为make_celery()方法,该方法传入Flask程序中的app,在方法中实例化Celery,并创建一个名为ContextTask类用来和Flask中的app建立关系,最后返回celery。
创建名为tasks.py文件,该文件用来存放我们的耗时任务,示例代码如下:
import time
from app import celery
@celery.task #使用异步任务装饰器task
def add(x, y):
time.sleep(5) #休眠5秒
return x + y
这里我们通过休眠的方式来模拟耗时的下载任务。
Flask程序app.py文件示例代码如下:
from flask import Flask
import tasks
from mycelery import make_celery
app = Flask(__name__)
celery = make_celery(app) #调用make_celery方法并传入app使celery和app进行关联
@app.route('/')
def hello():
tasks.add.delay(1,2) #调用tasks文件中的add()异步任务方法
return '请求正在后台处理中,您可以去处理其他事情'
if __name__ == '__main__':
app.run(debug=True)
app.py文件很简单,就调用make_celery方法使celery和app进行关联,并在视图函数中使用tasks中的异步任务方法。
在终端执行如下代码运行Celery职程(Worker)服务:
celery -A tasks worker -l info -P eventlet -c 10
启动Flask程序,访问http://127.0.0.1:5000/后在终端查Worker服务,如下图所示:
当我们不使用Celery时,用户在执行耗时任务时,用户可能要等耗时任务完成后,才能进行其他操作。
好了,Flask框架——基于Celery的后台任务就讲到这里了,感谢观看,下篇文章继续学习Flask框架的其他知识。
公众号:白巧克力LIN
该公众号发布Python、数据库、Linux、Flask、自动化测试、Git等相关文章!
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
需求:要创建虚拟机,就需要给他提供一个虚拟的磁盘,我们就在/opt目录下创建一个10G大小的raw格式的虚拟磁盘CentOS-7-x86_64.raw命令格式:qemu-imgcreate-f磁盘格式磁盘名称磁盘大小qemu-imgcreate-f磁盘格式-o?1.创建磁盘qemu-imgcreate-fraw/opt/CentOS-7-x86_64.raw10G执行效果#ls/opt/CentOS-7-x86_64.raw2.安装虚拟机使用virt-install命令,基于我们提供的系统镜像和虚拟磁盘来创建一个虚拟机,另外在创建虚拟机之前,提前打开vnc客户端,在创建虚拟机的时候,通过vnc
Transformers开始在视频识别领域的“猪突猛进”,各种改进和魔改层出不穷。由此作者将开启VideoTransformer系列的讲解,本篇主要介绍了FBAI团队的TimeSformer,这也是第一篇使用纯Transformer结构在视频识别上的文章。如果觉得有用,就请点赞、收藏、关注!paper:https://arxiv.org/abs/2102.05095code(offical):https://github.com/facebookresearch/TimeSformeraccept:ICML2021author:FacebookAI一、前言Transformers(VIT)在图
我是ruby的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl
如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)