11.定时任务&定时线程池详解
当我们不用任务框架时,我们想自己写一个定时任务时,我们能想起那个工具类呢?Timer ?还有吗?不知道了,下面我们要讲下ScheduledThreadPoolExecutor,定时任务线程池,可以执行一次任务,还可以执行周期性任务。
定时线程池的类的结构图如下:

从结构图上可以看出定时线程池ScheduledThreadPoolExecutor继承了线程池ThreadPoolExecutor,也就是说它们之间肯定有相同的行为和属性。
ScheduledThreadPoolExecutor常用发的方法如下:
1)schedule():一次行任务,延迟执行,任务只执行一次。
2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.
3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.
代码样例入下:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//无返回值 延迟5秒返回
scheduledThreadPoolExecutor.schedule(()->{
System.out.println("我要延迟5秒执行,只执行一次 ");
},5000, TimeUnit.MICROSECONDS);
}
}
可以用在启动项目时需要等待对象的加载,延迟执行一个任务。
带返回值的延迟执行任务如下:
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//有返回值任务 可以用作异步处理任务不用等待结果
ScheduledFuture<Integer> future = scheduledThreadPoolExecutor.schedule(()->{
System.out.println("我要延迟5秒执行,只执行一次 ");
return 1;
},5000, TimeUnit.MICROSECONDS);
System.out.println(future.get());
}
}
待返回值的任务,可以用于异步处理一个任务,等主线任务执行完,主要任务要知道异步任务的执行状态。
周期性任务:参数一样,方法名字不一样 例子如下
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestScheduledThreadPoolExecutor {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
//周期性的任务 发心跳 service1-service2 每次5s,发送一个心跳 下面的例子是不管任务是否执行完,一直想队列中放。 一个任务占一个线程。
//scheduledThreadPoolExecutor.scheduleAtFixedRate(()->{
//等待任务执行结束,在间隔2秒执行。
scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
System.out.println("send heart beat");
long startTime = System.currentTimeMillis(),nowTime = startTime;
while((nowTime-startTime)<5000){
nowTime = System.currentTimeMillis();
try{
Thread.sleep(100);
}catch (InterruptedException e ){
e.printStackTrace();
}
}
System.out.println("task over .....");
//任务启动多久之后 ,周期 每2s执行一次,时间单位
},1000,2000,TimeUnit.MILLISECONDS);
}
}
当使用setnx获取分布式锁(锁是有失效时间的),但是害怕任务没有执行完成锁失效了,怎么办呢?可以在任务的开始用一个定时线程池每隔一段时间看下锁是否失效如果没失效延长失效时间,如果失效不做处理。这样可以保证任务执行完成。
服务注册客户端每隔多久向服务中心发送下自己的ip,端口,服务名字及服务状态。
import java.util.Timer;
import java.util.TimerTask;
public class TestTimer {
public static void main(String[] args) throws InterruptedException {
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
System.out.println("send star -----");
throw new RuntimeException("2134 243");
}
}, 1000, 2000);
}
}
上面是的使用方法,从使用方法上和定时线程池的使用方法类似,都是周期性的执行任务;
不同的地方是:
Timer:单线程,线程挂了,不会再创建线程执行任务;
ScheduledThreadPoolExecutor:线程挂了,再提交任务,线程池会创建新的线程执行任务。
线程池执行过程:调用sechedule相关方法时,会先把任务添加到队列中,再又线程从队列中取出执行。

它接收SchduledFutureTask类型的任务,是线程调度的最小单位,有三种提交方法:
1)schedule():一次行任务,延迟执行,任务只执行一次。
2)scheduleAtFixedRate():周期性任务,不不等待任务结束,每隔周期时间执行一次,新任务放进队列中.
3)scheduleWithFixedDelay():周期性任务,等待任务结束,每隔周期时间执行一次.
它采用DelayedWorkQueue存储等待的任务:
1)DelayedWorkQueue内部封装了一个PriorityQueue,根据它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
2)DelayedWorkQueue是一个无界队列;
SchduledFutureTask 接收的参数(成员变量):
1)private long time :任务开始的时间;
2)private final long sequenceNumber:任务的序号;
3)private final long period:任务执行的间隔;
工作线程的执行 过程:
ScheduledThreadPoolExecutor会把执行的任务放到工作队列DelayedQueue中,DelayedQueue封装了一个PriorityQueue,PriorityQueue会对队列中的SchduledFutureTask 进行排序,具体的排序算法如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
1)首先按照time排序,time小的排在前面,time大的排在后面;
2)如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面。如果两个task的执行时间相同,优先执行先提交的task.
ScheduledFutureTaskn的run方法实现:
run方法是调度task的核心,task 的执行实际是run方法的执行。
public void run() {
boolean periodic = isPeriodic();
//如果当前线程池已经不支持执行任务,则取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不需要周期性执行,则直接执行run方法
else if (!periodic)
ScheduledFutureTask.super.run();
//如果需要周期性执行,先执行,后设置下次执行时间
else if (ScheduledFutureTask.super.runAndReset()) {
//计算下次执行时间
setNextRunTime();
//再次将执行任务添加到队列中,重复执行。
reExecutePeriodic(outerTask);
}
}
}
reExecutePeriodic 源码如下:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
该方法和delayExecute方法类似,不同的是:
1)由于调用reExecutePeriodic 方法时已经执行过一次周期性任务了,所以不会reject当前任务;
2)传入的任务一定是周期性任务
首先是schedule方法,该方法指任务在指定延迟时间到达后触发,只会执行一次。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
//参数校验
if (callable == null || unit == null)
throw new NullPointerException();
//这是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
//然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法。
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
scheduleWithFixedDelay周期性执行任务:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
// 将任务包装成 ScheduledFutureTask 类型
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
// 再次装饰任务,可以复写 decorateTask 方法,定制化任务
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 放入延时队列中,ScheduledFutureTask 是接口 RunnableScheduledFuture 的一个实现类
// 所以放入队列还是 ScheduledFutureTask 类型的
delayedExecute(t);
return t;
}
任务提交方法delayedExecute源码如下:
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池已经关闭,则 使用决绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
//与ThreadPoolExecutor不同的,这里直接把任务加入延迟队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
//如果当前状态无法执行,则取消
remove(task))
task.cancel(false);
else
//这里增加了一个worker线程,避免提交的任务没有worker去执行
//原因就是该类没有像ThreadPoolExecutor 一样,核心worker满了,才放入队列
ensurePrestart();
}
}
ScheduledThreadPoolExecutor之所以要在自己实现阻塞的工作队列,是因为ScheduledThreadPoolExecutor要求的工作队列有些特殊。
DelayedWorkerQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务执行时间都不同,所以DelayedWorkerQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的qianmian(注意:这里的顺序并不是绝对的,堆中的排序只保证了自己的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不是顺序的。)
堆结构图如下

可知,DelayedWorkerQueue是一个基于最小堆结构的队列。堆结构可以使用数组表示,可以转换成如下的数组

在这种结构中,可以发下如下特点:
假设索引值从0开始,子节点的索引值为K,父节点的索引值为P,则:
为什么要使用DelayedWorkerQueue呢?
定时任务执行时需要取出最近要执行的任务,所以任务在队列中每次出队时,一定要是当前队列中执行时间最靠前的,所以自然要使用优先级队列。
DelayedWorkerQueue是一个优先级队列,它可以保证每次出队列的任务都是当前队列中执行时间最靠前的,由于它是基于堆结构的队列,堆结构在执行插入和删除操作时的最坏时间复杂度是O(logN).
DelayedWorkerQueue的属性:
//队列初始化容量
private static final int INITIAL_CAPACITY = 16;
//根据初始化容量创建RunnableScheduledFuture 类型的数组;
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader 线程
private Thread leader = null;
// 当较新的任务在队列的头部可用时,或者新线程可能需要成为leader,则通过该条件发出信号
private final Condition available = lock.newCondition();
注意:这里的leader,它是Leader-Follower模式的变体,用于减少不必要的定时等待。什么意思呢?对于多线程的网络模型来说 所有线程会有三种身份中的一种:leader和follower,以及一个干活中的状态:proccesser。它的基木原则就是,永远最多只有一个leader,而所有follower都在等待成为leader。线程池启动时会自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程将其提拔为新的Leader,然后自己就去干活了,去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待下次成为Leader。这种方法可以增强CPU高速缓存相似性,及消除动态内存分配和线程间的数据交换。
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
我正在尝试测试是否存在表单。我是Rails新手。我的new.html.erb_spec.rb文件的内容是:require'spec_helper'describe"messages/new.html.erb"doit"shouldrendertheform"dorender'/messages/new.html.erb'reponse.shouldhave_form_putting_to(@message)with_submit_buttonendendView本身,new.html.erb,有代码:当我运行rspec时,它失败了:1)messages/new.html.erbshou
我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我有一个对象has_many应呈现为xml的子对象。这不是问题。我的问题是我创建了一个Hash包含此数据,就像解析器需要它一样。但是rails自动将整个文件包含在.........我需要摆脱type="array"和我该如何处理?我没有在文档中找到任何内容。 最佳答案 我遇到了同样的问题;这是我的XML:我在用这个:entries.to_xml将散列数据转换为XML,但这会将条目的数据包装到中所以我修改了:entries.to_xml(root:"Contacts")但这仍然将转换后的XML包装在“联系人”中,将我的XML代码修改为
为了将Cucumber用于命令行脚本,我按照提供的说明安装了arubagem。它在我的Gemfile中,我可以验证是否安装了正确的版本并且我已经包含了require'aruba/cucumber'在'features/env.rb'中为了确保它能正常工作,我写了以下场景:@announceScenario:Testingcucumber/arubaGivenablankslateThentheoutputfrom"ls-la"shouldcontain"drw"假设事情应该失败。它确实失败了,但失败的原因是错误的:@announceScenario:Testingcucumber/ar
我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer
我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server
在MRIRuby中我可以这样做:deftransferinternal_server=self.init_serverpid=forkdointernal_server.runend#Maketheserverprocessrunindependently.Process.detach(pid)internal_client=self.init_client#Dootherstuffwithconnectingtointernal_server...internal_client.post('somedata')ensure#KillserverProcess.kill('KILL',
如何使用RSpec::Core::RakeTask初始化RSpecRake任务?require'rspec/core/rake_task'RSpec::Core::RakeTask.newdo|t|#whatdoIputinhere?endInitialize函数记录在http://rubydoc.info/github/rspec/rspec-core/RSpec/Core/RakeTask#initialize-instance_method没有很好的记录;它只是说:-(RakeTask)initialize(*args,&task_block)AnewinstanceofRake