目录
flask-apscheduler将apscheduler移植到了flask应用中,使得在flask中可以非常方便的使用定时任务了,除此之外,它还有如下几个特性
Flask配置加载调度器配置Flask配置加载任务调度器RESTful API管理任务,也就是远程管理任务RESTful API提供认证下载安装
pip install flask-apscheduler
flask-apscheduler的相关配置,我们会将它和其它扩展一起,放在应用的配置里
class Config(object):
// 配置项
JOBS = [
{
'id': 'job1',
'func': 'run:add',
'args': (1, 2),
'trigger': 'interval',
'seconds': 3
}
]
SCHEDULER_API_ENABLED = True
def add(a, b):
print(a+b)
JOBS列表的每一个元素表示一个定时任务,列子中只有一个interval任务,表示每隔3秒运行一次函数add。func指定调用的函数,args表示传入函数的参数,trigger表示启动方式,常用的有两种,分别是trigger和cron。
上边我们设置了SCHEDULER_API_ENABLED = True,可以通过访问 http://127.0.0.1:5000/scheduler ,其中scheduler是默认的RESTful API前缀

通过查看源码,可以发现flask-apscheduler提供了以下的接口
def _load_api(self):
"""
Add the routes for the scheduler API.
"""
self._add_url_route('get_scheduler_info', '', api.get_scheduler_info, 'GET')
self._add_url_route('add_job', '/jobs', api.add_job, 'POST')
self._add_url_route('get_job', '/jobs/<job_id>', api.get_job, 'GET')
self._add_url_route('get_jobs', '/jobs', api.get_jobs, 'GET')
self._add_url_route('delete_job', '/jobs/<job_id>', api.delete_job, 'DELETE')
self._add_url_route('update_job', '/jobs/<job_id>', api.update_job, 'PATCH')
self._add_url_route('pause_job', '/jobs/<job_id>/pause', api.pause_job, 'POST')
self._add_url_route('resume_job', '/jobs/<job_id>/resume', api.resume_job, 'POST')
self._add_url_route('run_job', '/jobs/<job_id>/run', api.run_job, 'POST')
如果需要查看当前运行的所有定时任务,则请求http://127.0.0.1:5000/scheduler/jobs即可。
interval表示间隔启动,在interval方式中,使用seconds配置间隔多久启动一次,单位是秒。
cron表示定时启动
class Config(object):
JOBS = [
{
'id': 'job1',
'func': 'scheduler:task',
'args': (1, 2),
'trigger': 'cron',
'day': '*',
'hour': '13',
'minute': '16',
'second': '20'
}
]
SCHEDULER_API_ENABLED = True
def task(a, b):
print(str(datetime.datetime.now()) + ' execute task ' + '{}+{}={}'.format(a, b, a + b))
该配置项则表示每天的13点16分20秒启动一次。*表示全部。
有关常用的cron配置有:
['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
from flask import Flask
from flask_apscheduler import APScheduler
import datetime
class Config(object):
SCHEDULER_API_ENABLED = True
scheduler = APScheduler()
# interval examples
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
print(str(datetime.datetime.now()) + ' Job 1 executed')
表示每隔30秒调用一次job1函数。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl
如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)
我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n
根据thispostbyStephenHagemann,我正在尝试为我的一个rake任务编写Rspec测试.lib/tasks/retry.rake:namespace:retrydotask:message,[:message_id]=>[:environment]do|t,args|TextMessage.new.resend!(args[:message_id])endendspec/tasks/retry_spec.rb:require'rails_helper'require'rake'describe'retrynamespaceraketask'dodescribe're
我正在使用jeweler为Rails3创建一个gem。该gem包含一个rake任务,它所做的其中一件事是删除数据库,所以我正在使用“database_cleaner”。我在gem的Gemfile中指定gem依赖项gem'database_cleaner'在Rakefile中Jeweler::Tasks.newdo|gem|...gem.add_dependency'database_cleaner'end然后在lib中我创建了文件my_gem.rb和tasks.rake。如下,my_gem.rb:moduleMyGemclassRailtie和tasks.rake:task:my_ta
我可以像这样调用一个Rake任务并设置一个环境变量:$ONE=1raketemp:both但是如何设置两个环境变量呢?这行不通:$ONE=1TWO=2raketemp:both这行得通,但读起来很困惑:$ONE=1raketemp:bothTWO=2如何在调用rake之前传递多个环境? 最佳答案 同意@Ernest;它应该工作。这是一个示例...示例rake任务以回显变量:task:echo_envdoputs"VAR1:#{ENV['VAR1']}"puts"VAR2:#{ENV['VAR2']}"end执行任务:VAR1=fir
我想每10分钟执行一次cron作业,但我的系统只执行1小时。所以我正在寻找一种方法来做到这一点。我看过Timer和sleep但我不确定如何执行此操作,甚至不知道如何实现此操作。 最佳答案 看看http://rufus.rubyforge.org/rufus-scheduler/rufus-scheduler是一个用于调度代码片段(作业)的Rubygem。它了解在特定时间、在特定时间、每x次或仅通过CRON语句运行作业。rufus-scheduler不能替代cron/at,因为它在Ruby内部运行。