目录
3、Executor,Executors,ExecutorService
今天在读项目代码的过程中发现了项目中有CompletableFuture的使用,虽然很早就知道这个类,也会使用但是从来没有探究代码的实现逻辑,今天凑着一个机会从里到外扒一扒这个类,希望能讲明白。
先讲一讲线程,我想刚入门的同学都知道线程是什么,线程是为了提升cpu利用效率,防止阻塞的执行单位,举个例子,比如你正在做饭,发现家里没有酱油了,这时候你有两个选择,一个是停下手里的活,自己去打酱油,另外一个方式就是叫你儿子去打酱油,你儿子就是相当于一个新线程,帮你解决问题。打酱油这件事情就是一个任务。
所有的事情都可以从生活中发现蛛丝马迹,从上面的例子中我们可以看到,线程就是一个执行单位,可以具象理解为就是一个人,而要做的事情可以理解为一件任务,所以迁移到程序中我们可以理解线程。
Thread -> 人
Runnable->没有回复的任务
callable -> 有回复的任务
Runnable 和 callable 的不同就是有没有回复结果,比如在开发中发出命令不需要回复,当存储数据库的时候不关注结果,只是发出动作可以使用runnable,比如第一个例子的打酱油,是需要计算结果的,这个时候使用callable是合适的。
通过继承Thread 创建线程,覆盖实现run方法,将任务代码写在run内,通过start 启动线程。
class MyThread extends Thread{ // 继承Thread类,作为线程的实现类
private String name ; // 表示线程的名称
public MyThread(String name){
this.name = name ; // 通过构造方法配置name属性
}
public void run(){ // 覆写run()方法,作为线程任务
for(int i=0;i<10;i++){
System.out.println(name + "运行,i = " + i) ;
}
}
};
public class ThreadDemo{
public static void main(String args[]){
MyThread mt1 = new MyThread("香菜A ") ; // 实例化对象
MyThread mt2 = new MyThread("香菜B ") ; // 实例化对象
mt1.start() ; // 开始执行任务吧
mt2.start() ; // 开始执行任务吧
}
}
Runnable 是一个接口,也就是一个规范,定义了任务的基本方法run,这个在有些也叫契约(垃圾概念)
这种方式是任务和线程进行分离,在启动线程的时候告诉他执行什么任务。只定义任务,随便来个线程都可以执行
class MyThread implements Runnable{ // 实现Runnable接口
public void run(){ // 覆写run()方法
while(true){
System.out.println(Thread.currentThread().getName() + "在运行。") ;
}
}
};
public class ThreadDemo{
public static void main(String args[]){
MyThread mt = new MyThread() ; // 实例化Runnable子类对象
Thread t = new Thread(mt,"线程"); // 实例化Thread对象
t.setDaemon(true) ; // 此线程在后台运行
t.start() ; // 启动线程
}
};
实现 Callable 接口, 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
使用Callable 需要有 FutureTask的支持,再次使用上面打酱油的例子,你在让你儿子打酱油的时候说,等会打酱油回来之后可以直接放在桌子的左上角,你在做饭的过程中只要发现左上角有酱油就代表任务完成了。
FutureTask 可以理解为约定的一个地方,线程执行完之后就会把结果放在FutureTask的result容器中。具体的可以参照:《多线程系列二》不理解future怎么能有future?
public class TestCallable {
public static void main(String[] args) {
ThreadDemo td = new ThreadDemo();
//1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
FutureTask<Integer> result = new FutureTask<>(td);
new Thread(result).start();
//2.接收线程运算后的结果
try {
Integer sum = result.get(); //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的
System.out.println(sum);
System.out.println("------------------------------------");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class ThreadDemo implements Callable<Integer> {
@Override
public Integer call() throws Exception {
int sum = 0;
for (int i = 0; i <= 100000; i++) {
sum += i;
}
return sum;
}
}
小结:在理解多线程的时候主要映射到现实生活的需求中,Thread 就是一个人,需要做的事情就是任务,任务分为有回复的任务和无回复的任务,也就分出Runnable和Callable 了。
第二个话题其实是Java8 的新特性,在之前的工作经历中,发现还有很多人停留在Java6的语法上,不愿意学习也不愿意接受新的语法特性,究其原因是没理解,不能很好的掌握,这个我也写过一篇文章,里面有详细的说明。
一篇文章掌握lambda,function下41个类 我相信读完一定能一下掌握所有的function。
简单一句话概括,所有的function功能上和定义的函数一样,只不过定义了通用的结构,相当于通用的规则,只要填写代码就可以了,而这一组函数按不同的需求可以分为
Function 有返回值的函数,使用accept函数
Consumer 必须有参数的函数,使用apply函数
Supplier 有返回值的函数,使用get函数

线程池这个大家都知道,是为了提高效率,可以类比生活,如果开个店,需要几个员工,正常的操作都是雇佣员工,而不是每天使用临时工,这样用完就解雇掉,对于店主来说招人的成本太高,还需要培训,我想正常的都不会这么做,线程池也是同样的道理,避免了创建和销毁线程的开销。
java线程池中的一个顶级接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,在Executor中,可以使用Executor而不用显示地创建线程:
executor.execute(new RunnableTask()); // 异步执行
ExecutorService:是一个比Executor使用更广泛的子类接口,提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。
Executors是个静态工厂类。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

newScheduledThreadPool 定时执行的线程池
newCachedThreadPool 缓存使用过的线程
newFixedThreadPool 固定数量的线程池
newWorkStealingPool 将大任务分解为小任务的线程池
可以看到提供了一些具体的线程池模型,可以根据自己的需求使用。
终于到我今天想说的了,上面的基本上都可以在我的博客中找到相关的专题,今天主要聊一下CompletableFuture。
CompletionStage 的接口一般都返回新的CompletionStage,表示执行完一些逻辑后,生成新的CompletionStage,构成链式的阶段型的操作。
CompletionStage的接口方法可以从多种角度进行分类,可
以从函数的命名和函数进行分类
每个函数名可以拆分为三段,第一个单词表示触发时机,第二个表述stage之间的关系,最后一个表示执行的方式
从函数的参数可以看到主要是有返回值的function,消费型的Consumer以及runnable
先来个例子吧,可以运行下面的例子看下打酱油的全过程,并不影响你做菜。
public static void main(String[] args) throws InterruptedException {
CompletableFuture
//让儿子去打酱油
.supplyAsync(()-> {
try {
System.out.println("儿子跑步去打酱油");
TimeUnit.SECONDS.sleep(1);
System.out.println("酱油打好了");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "酱油";
})
//告诉我一声
.thenAccept(oil->{
System.out.println("做菜用酱油:" + oil);
});
System.out.println("继续做菜");
Thread.currentThread().join();
}
一个completetableFuture就代表了一个任务,看名字就知道和Future的关系。使用future需要等待isDone为true才能知道任务跑完了。或者就是用get方法调用的时候会出现阻塞。而使用completableFuture的使用就可以用then,when等等操作来防止以上的阻塞和轮询isDone的现象出现。
从上面的源码我们看到并没有我们之前知道的线程池相关的东西,也没使用线程池,到底是怎么做的呐?
打开源码瞧一瞧。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
Supplier 是一个有返回值的函数,在上面的例子中我们反悔了一个酱油的字符串。
在上面的return语句中我们看到使用了一个asyncPool,这是个什么东西?我们找下定义
/**
* Default executor -- ForkJoinPool.commonPool() unless it cannot
* support parallelism.
*/
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
从上面的代码注释中我们看到,这是一个缺省的线程池,使用ForkJoinPool.commonPool(),这下我们明白了原来是使用了缺省的线程池。这个线程池默认创建的线程数是 CPU 的核数
CompletionStage的作用就是为了链式编程而存在的,所以可以猜测下CompletionStage 扮演了重要的作用,看下源码吧。
//1
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
//2
static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
if (f == null) throw new NullPointerException();
CompletableFuture<U> d = new CompletableFuture<U>();
e.execute(new AsyncSupply<U>(d, f));
return d;
}
asyncSupplyStage()方法中,调用指定的线程池,并执行execute(new AsyncSupply(d,f)),这里d就是我们的“源任务”,接下来thenApply()要依赖着这个源任务进行后续逻辑操作,f是Supplier的函数式编程。
static final class AsyncSupply<T> extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
CompletableFuture<T> dep; Supplier<T> fn;
AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
public final boolean exec() { run(); return true; }
public void run() {
CompletableFuture<T> d; Supplier<T> f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) {
try {
d.completeValue(f.get());
} catch (Throwable ex) {
d.completeThrowable(ex);
}
}
d.postComplete();
}
}
}
在run()方法里。在run()方法里,先判断d.result == null,判断该任务是否已经完成,防止并发情况下其他线程完成此任务了。f.get()就是调用的Supplier的函数式编程。
主线程会在asyncSupplyStage()方法中返回d,就是我们的“依赖任务”,而这个任务此时还处在阻塞中。接下来main线程会继续执行CompletableFuture的thenAccept(Comsumer<? super T> action)方法,然后调用CompletableFuture的uniAcceptStage()方法。
CompletableFuture中有“源任务”和“依赖任务”,“源任务”的完成能够触发“依赖任务”的执行,这里的完成可以是返回正常结果或者是异常。
CompletableFuture 是一个自带缺省线程池的,并且支持链式编程的,免去线程之间关系的类,在多线程编程中可以减少代码量,减少线程的调用,推荐
码字不易,求三连,求支持,为爱发电
我试图获取一个长度在1到10之间的字符串,并输出将字符串分解为大小为1、2或3的连续子字符串的所有可能方式。例如:输入:123456将整数分割成单个字符,然后继续查找组合。该代码将返回以下所有数组。[1,2,3,4,5,6][12,3,4,5,6][1,23,4,5,6][1,2,34,5,6][1,2,3,45,6][1,2,3,4,56][12,34,5,6][12,3,45,6][12,3,4,56][1,23,45,6][1,2,34,56][1,23,4,56][12,34,56][123,4,5,6][1,234,5,6][1,2,345,6][1,2,3,456][123
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
当我的预订模型通过rake任务在状态机上转换时,我试图找出如何跳过对ActiveRecord对象的特定实例的验证。我想在reservation.close时跳过所有验证!叫做。希望调用reservation.close!(:validate=>false)之类的东西。仅供引用,我们正在使用https://github.com/pluginaweek/state_machine用于状态机。这是我的预订模型的示例。classReservation["requested","negotiating","approved"])}state_machine:initial=>'requested
我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("
我有这个html标记:我想得到这个:我如何使用Nokogiri做到这一点? 最佳答案 require'nokogiri'doc=Nokogiri::HTML('')您可以通过xpath删除所有属性:doc.xpath('//@*').remove或者,如果您需要做一些更复杂的事情,有时使用以下方法遍历所有元素会更容易:doc.traversedo|node|node.keys.eachdo|attribute|node.deleteattributeendend 关于ruby-Nokog
我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/