草庐IT

Flask-定时任务

lllhhhv 2023-04-11 原文

目录

一、简单使用

二、apscheduler 

triggers

job stores

schedulers

使用上下文

日志设置


from flask import Flask

app = Flask(__name__)
app.config.from_object('config.config')

# 初始化db
db = SQLAlchemy()
db.init_app(app)


# 初始化定时任务
from models.taskSchedule import scheduler
scheduler.init_app(app)
scheduler.start()
# 修改调度器执行组件冗余日志级别
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)

taskSchedule.py:

from flask_apscheduler import APScheduler


scheduler = APScheduler()



# interval example, 间隔执行, 每30秒执行一次
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():
    print('Job 1 executed')


# cron examples, 每分钟执行一次
@scheduler.task('cron', id='do_job_2', minute='*')
def job2():
    print('Job 2 executed')

# 每周执行一次
@scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
def job3():
    print('Job 3 executed')

也可以在程勋运行后添加定时任务:

scheduler.start()
scheduler.add_job(**args)

二、apscheduler 

apscheduler 四个组件:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

triggers

支持三种任务触发方式

  • date:

固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

| 参数 | 说明 |
| :——————————– | :——————- |
| run_date (datetime 或 str) | 作业的运行日期或时间 |
| timezone (datetime.tzinfo 或 str) | 指定时区 |

例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])

 

  • interval:

时间间隔触发器,每个一定时间间隔执行一次。

| 参数 | 说明 |
| —————————- | ———- |
| weeks (int) | 间隔几周 |
| days (int) | 间隔几天 |
| hours (int) | 间隔几小时 |
| minutes (int) | 间隔几分钟 |
| seconds (int) | 间隔多少秒 |
| start_date (datetime 或 str) | 开始日期 |
| end_date (datetime 或 str) | 结束日期 |

# 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法
scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')

  • cron:

cron风格的任务触发

参数说明
year (int 或 str)表示四位数的年份 (2019)
month(int\str)月 (范围1-12)
day(int\str)日 (范围1-31)
week(int\str)周 (范围1-53)
day_of_week (int\str)表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\str)表示取值范围为0-23时
minute (int\str)表示取值范围为0-59分
second (int\str)表示取值范围为0-59秒
start_date (datetime\str)表示开始时间
end_date (datetime\str)表示结束时间
timezone (datetime.tzinfo\str)表示时区取值

(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

sched.add_job(my_job, 'cron',second = '*/5')

job stores

支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers

调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用

Flask-APScheduler 中默认使用的就是 BackgroundScheduler:

scheduler.py:

#.....

class APScheduler(object):
    """Provides a scheduler integrated to Flask."""

    def __init__(self, scheduler=None, app=None):
        self._scheduler = scheduler or BackgroundScheduler()
        self._host_name = socket.gethostname().lower()
        self._authentication_callback = None


# .....

使用上下文

如果正在使用 Flask-SQLAlchemy 并在定时任务中执行数据库操作,需要提供 Flask 应用程序上下文:

from flask_apscheduler import APScheduler
scheduler = APScheduler()
 
@scheduler.task(
     "interval",
      id="update_news_graph_job",
     minutes = 16
 )
def update_news_graph():

        # 提供flask上下文对象
        with scheduler.app.app_context():
            result = db.session.execute(query_sql)
当然, scheduler 需要在flask中注册:
scheduler.init_app(app)
scheduler.start()

官网关于上下文的解释:

Basic Application - Flask APScheduler

Introduction into Contexts — Flask-SQLAlchemy Documentation (2.x)

日志设置

如果定时任务执行间隔几秒钟, 调度程序的日志会很多,可以设置调度程序日志级别或完全禁用:

#设置调度程序的日志级别, 原本级别为info

scheduler.start()
scheduler.add_job(every_minute, trigger='cron', second=0, id='every_minute')
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)


#或者禁用调度程序日志
logging.getLogger('apscheduler.executors.default').propagate = False

有关Flask-定时任务的更多相关文章

  1. ruby - 其他文件中的 Rake 任务 - 2

    我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时

  2. ruby - 如何使用 RSpec::Core::RakeTask 创建 RSpec Rake 任务? - 2

    如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake

  3. ruby-on-rails - Rake 任务仅调用一次时执行两次 - 2

    我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里

  4. ruby - 帮助使用 Ruby 中的 "Whenever"gem 来执行 cron 任务 - 2

    我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl

  5. ruby - 在 rake 任务中运行 capybara - 2

    如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)

  6. ruby - 在 Rakefile 中动态生成 Rake 测试任务(基于现有的测试文件) - 2

    我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n

  7. ruby-on-rails - 使用 Rspec 测试 rake 任务不接受参数 - 2

    根据thispostbyStephenHagemann,我正在尝试为我的一个rake任务编写Rspec测试.lib/tasks/retry.rake:namespace:retrydotask:message,[:message_id]=>[:environment]do|t,args|TextMessage.new.resend!(args[:message_id])endendspec/tasks/retry_spec.rb:require'rails_helper'require'rake'describe'retrynamespaceraketask'dodescribe're

  8. ruby-on-rails - 在 gem 的 rake 任务中需要 gem - 2

    我正在使用jeweler为Rails3创建一个gem。该gem包含一个rake任务,它所做的其中一件事是删除数据库,所以我正在使用“database_cleaner”。我在gem的Gemfile中指定gem依赖项gem'database_cleaner'在Rakefile中Jeweler::Tasks.newdo|gem|...gem.add_dependency'database_cleaner'end然后在lib中我创建了文件my_gem.rb和tasks.rake。如下,my_gem.rb:moduleMyGemclassRailtie和tasks.rake:task:my_ta

  9. ruby-on-rails - 在调用 Rake 任务时设置多个环境变量 - 2

    我可以像这样调用一个Rake任务并设置一个环境变量:$ONE=1raketemp:both但是如何设置两个环境变量呢?这行不通:$ONE=1TWO=2raketemp:both这行得通,但读起来很困惑:$ONE=1raketemp:bothTWO=2如何在调用rake之前传递多个环境? 最佳答案 同意@Ernest;它应该工作。这是一个示例...示例rake任务以回显变量:task:echo_envdoputs"VAR1:#{ENV['VAR1']}"puts"VAR2:#{ENV['VAR2']}"end执行任务:VAR1=fir

  10. ruby - 如何让 Ruby 每 10 分钟运行一次任务? - 2

    我想每10分钟执行一次cron作业,但我的系统只执行1小时。所以我正在寻找一种方法来做到这一点。我看过Timer和sleep但我不确定如何执行此操作,甚至不知道如何实现此操作。 最佳答案 看看http://rufus.rubyforge.org/rufus-scheduler/rufus-scheduler是一个用于调度代码片段(作业)的Rubygem。它了解在特定时间、在特定时间、每x次或仅通过CRON语句运行作业。rufus-scheduler不能替代cron/at,因为它在Ruby内部运行。

随机推荐