
本文将从单机定时调度开始,循序渐进地带领大家了解五福定制三层分发任务处理框架。
一、背景介绍
二、定时任务分类

2.1、单机任务
单机定时任务毫无疑问是在单台机器上运行的定时任务。在业务量级不大,没有进行分库分表时,往往单机定时任务即可满足业务需求。
从复杂度上来说,单机定时任务又可分为简单的定时调度和定时调度+批处理两种。
在Spring中可以通过@Scheduled 来启用定时任务。触发的方式有两种,分别是:cron 表达式和 fixedRated类配置参数。常用的案例:
// cron表达式
@Scheduled(cron="0 0/30 9-17 * * ?") //按cron规则执行,朝九晚五工作时间内每半小时
@Scheduled(cron="0 0 12 ? * WED") //按cron规则执行,表示每个星期三中午12点
// fixedRated类配置
@Scheduled(fixedRate=5000) //上一次开始执行时间点后5秒再次执行;
@Scheduled(fixedDelay=3000) //上一次执行完毕时间点后3秒再次执行;
@Scheduled(initialDelay=1000, fixedDelay=2000) //第一次延迟1秒执行,然后在上一次执行完毕时间点后2秒再次执行;

定时调度往往用于业务处理流程比较简单的场景,比如定时生成简单报表,发送通知。对于复杂耗时的场景,处理效率不高,业务高峰期会积压大量待处理数据,影响业务。
为了解决复杂耗时场景下定时调度效率不高的问题,可以引入批处理框架。定时调度与批处理框架相结合,可以大幅提高数据处理的效率,提升系统稳定性,保障业务稳定运行。
以Spring Batch批处理框架为例,任务处理流程如下:

Spring Batch批处理框架将任务拆分成多个Step,同时每个Step里面又分为itemReader,itemProcessor, itemWriter。通过将任务分层细化,能够让多个阶段并行处理,提高任务处理效率。批处理框架结合定时调度框架,可以在单机情况下,对大量复杂的业务进行高效的批处理。
2.2、集群任务
在分库分表大业务流量情况下,单机定时任务已无法满足业务需求了,这时就产生了集群定时任务。在支付宝技术架构下,用户数据按照eid进行分库分表,同时进行Zone维度的隔离。此时单机定时任务无法对全量数据做处理,于是支付宝便有了自己的分布式任务调度中间件Antscheduler,配合三层分发任务处理框架,就可以对大量数据进行定时批量处理。


/**
* 根据配置中心的dataFlag过滤
* 1、默认ALL不区分
* 2、ODD 表示仅分发奇数表号
* 3、EVEN 标识仅分发偶数表号
*
* @param eidList
*/
public void filteByDataFlag(List<String> eidList) {
String dataFlag = SchedulerConfigDrmUtil.getIndexFilterFlag();
int strategy = -1;
if (StringUtil.equalsIgnoreCase("ODD", dataFlag)) {
strategy = 1;
} else if (StringUtil.equalsIgnoreCase("EVEN", dataFlag)) {
strategy = 0;
}
if (strategy == -1) {
// ALL
return;
}
// filter
Iterator<String> it = eidList.iterator();
while (it.hasNext()) {
String str = it.next();
int index = NumberUtils.toInt(str, -1);
if (index % 2 != strategy) {
it.remove();
}
}
}
通过代码可知,推送任务配置时,将A组机器的值推成“ODD”,B组机器的值推成“EVEN”,即可实现A/B组的所有机器同时执行定时任务的效果。
/**
* 计算定时任务的相关配置
*
* 主要计算:
* scheduleSingleLimit 单机限流值
* scheduleLoaderCount loader捞取条数
*
* @param scheduleConfig
*/
public static SchedulerConfig calculateScheduleConfig(SchedulerConfig scheduleConfig) {
final int qpsLimit = scheduleConfig.getScheduleWholeLimit();
final int machineCounts = scheduleConfig.getScheduleMachineCounts();
MtLogger.info(LOGGER,
"【计算定时任务配置】-开始 任务名称:{0},机器数量:{1},任务吞吐量:{2}.", scheduleConfig.getScheduleType(),
machineCounts, qpsLimit);
//定时任务的调度频率是scheduleRate 秒执行一次 所以scheduleRate秒中内集群的整体吞吐量=qps限制*scheduleRate
final int scheduleRate = scheduleConfig.getScheduleRatePerSec();
long totalLoaderCounts = qpsLimit * TimeUnit.SECONDS.toSeconds(scheduleRate);
//定时任务捞取的表数量为1000 所以到每个表的限制=totalLoaderCounts/1000
long loaderCountPerTask = totalLoaderCounts / 1000;
if (loaderCountPerTask < 1) {
loaderCountPerTask = 1;
}
scheduleConfig.setScheduleLoaderCount((int) loaderCountPerTask);
//整体限流通过单机限流实现 整体限流=单机限流*machineCounts
final double singleQps = ((double) qpsLimit / machineCounts);
//创建的限流需要1秒的预热
scheduleConfig.setScheduleSingleLimit(RateLimiter.create(singleQps, 1, TimeUnit.SECONDS));
MtLogger.info(LOGGER,
"【计算定时任务配置】-结束 任务名称:{0},捞取条数:{1},单机限流:{2}.", scheduleConfig.getScheduleType(),
scheduleConfig.getScheduleLoaderCount(),
scheduleConfig.getScheduleSingleLimit().getRate());
return scheduleConfig;
}
通过代码可知,推送的任务配置最终会生成两个重要的配置信息:

相比通常情况下指定Loader每次捞取的任务数,五福是通过集群qps和任务调度间隔来确定Loader需要捞取的任务数。因此一个调度间隔内的任务数和集群能够执行的任务数是匹配上的,加上通过单机qps限制达到集群qps限制的效果,从而让整个定时任务做到了平滑调用。简而言之,优化后的调度逻辑,能够让定时调度任务在机器维度和时间维度都能均匀平稳的执行。

三、结语
从单机到集群,再到五福定制集群定时任务,本文逐步做了一个框架设计上的原理介绍。每种定时任务都有自己的优点和缺陷,也都有自己的应用场景。在工作中,要结合当前的业务情况,选择合适的定时任务进行业务处理,避免设计上的失误导致业务受损。以五福定制三层分发任务处理框架为例,虽然日常业务中,因为机器数量不固定,依旧无法做到任务的平滑调用,但我们可以借鉴最大化利用集群机器资源这一点,同时开启A/B组的定时任务,从而实现任务调度真正的负载均衡,提高系统整体的稳定性。
作者|金盛杰(司旭)
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake
?博客主页:https://xiaoy.blog.csdn.net?本文由呆呆敲代码的小Y原创,首发于CSDN??学习专栏推荐:Unity系统学习专栏?游戏制作专栏推荐:游戏制作?Unity实战100例专栏推荐:Unity实战100例教程?欢迎点赞?收藏⭐留言?如有错误敬请指正!?未来很长,值得我们全力奔赴更美好的生活✨------------------❤️分割线❤️-------------------------
我写了一个非常简单的rake任务来尝试找到这个问题的根源。namespace:foodotaskbar::environmentdoputs'RUNNING'endend当在控制台中执行rakefoo:bar时,输出为:RUNNINGRUNNING当我执行任何rake任务时会发生这种情况。有没有人遇到过这样的事情?编辑上面的rake任务就是写在那个.rake文件中的所有内容。这是当前正在使用的Rakefile。requireFile.expand_path('../config/application',__FILE__)OurApp::Application.load_tasks这里
我需要使用ActiveMerchant库在我们的一个Rails应用程序中设置支付解决方案。尽管这个问题非常主观,但人们对主要网关(BrainTree、Authorize.net等)的体验如何?它必须:处理定期付款。有能力记入个人帐户。能够取消付款。有办法存储用户的付款详细信息(例如Authotize.netsCIM)。干杯 最佳答案 ActiveMerchant很棒,但在过去一年左右的时间里,我在使用它时发现了一些问题。首先,虽然某些网关可能会得到“支持”——但并非所有功能都包含在内。查看功能矩阵以确保完全支持您选择的网关-http
我以前没有使用过cron,所以我不能确定我这样做是对的。我想要自动化的任务似乎没有运行。我在终端中执行了这些步骤:sudogeminstall每当切换到应用程序目录无论何时。(这创建了文件schedule.rb)我将此代码添加到schedule.rb:every10.minutesdorunner"User.vote",environment=>"development"endevery:hourdorunner"Digest.rss",:environment=>"development"end我将此代码添加到deploy.rb:after"deploy:symlink","depl
如何在Rake任务中运行Capybara功能?例如:访问('http://google.com')谢谢! 最佳答案 在任务中尝试这样的事情:require'capybara'require'capybara/dsl'Capybara.current_driver=:seleniumBrowser=Class.new{includeCapybara::DSL}page=Browser.new.pagepage.visit("http://www.google.com")puts(page.html)
我是一名决定学习Ruby和RubyonRails的ASP.NETMVC开发人员。我已经有所了解并在RoR上创建了一个网站。在ASP.NETMVC上开发,我一直使用三层架构:数据层、业务层和UI(或表示)层。尝试在RubyonRails应用程序中使用这种方法,我发现没有关于它的信息(或者也许我只是找不到它?)。也许有人可以建议我如何在RubyonRails上创建或使用三层架构?附言我使用ruby1.9.3和RubyonRails3.2.3。 最佳答案 我建议在制作RoR应用程序时遵循RubyonRails(RoR)风格。Rails
我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n
根据thispostbyStephenHagemann,我正在尝试为我的一个rake任务编写Rspec测试.lib/tasks/retry.rake:namespace:retrydotask:message,[:message_id]=>[:environment]do|t,args|TextMessage.new.resend!(args[:message_id])endendspec/tasks/retry_spec.rb:require'rails_helper'require'rake'describe'retrynamespaceraketask'dodescribe're