草庐IT

线程池的机制

JakeWharton 2023-09-13 原文
老生常谈的问题:什么是线程池?

线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

为什么要用线程池?

1.创建/销毁线程伴随着系统开销,使用多线程过于频繁的创建/销毁线程,会很大程度上影响处理效率;这里线程池可以复用线程,线程池可以避免性能降低。
2.线程并发数量过多,抢占系统资源从而导致阻塞;这里线程池可以显示最大线程数量。
3.对线程进行一些简单的管理

讲线程池原理之前,先讲一讲线程池原理要涉及到的阻塞队列。

阻塞队列:

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这里会有两个阻塞动作:在队列为空时,获取元素的线程会阻塞来等待队列变为非空。当队列满时,存储元素的线程会阻塞来等待队列可用。

JDK提供了一个阻塞队列的接口类BlockingQueue:
插入移除有三对方法,既有阻塞方法,也有非阻塞方法。

add(e)与remove() ,如果遇到阻塞,会抛出异常
offer(e)与poll(),如果遇到阻塞,offer返回false,poll返回null
put(e)与take() ,如果遇到阻塞,会一直阻塞

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

7个阻塞队列:

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,可以对元素类实现comparator,依据此进行排序。
DelayQueue:一个支持延时获取元素的无界阻塞队列,如果一个元素的延迟时间没有到,元素是获取不到的。
SynchronousQueue:一个不存储元素的阻塞队列。

有界表示队列长度有限,无界表示长度无限,可以无限放入元素。这个容量会在构造方法传入capacity值来初始化最大容量。

源码分析:

这里是ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
参数意义:

int corePoolSize, 核心线程数
int maximumPoolSize, 最大线程数
long keepAliveTime, 设置空闲线程的存活时间
TimeUnit unit, 设置空闲线程的存活时间单位
BlockingQueue<Runnable> workQueue, 保存任务的阻塞队列
ThreadFactory threadFactory, 创建线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler) 饱和/拒绝策略,内置4种策略

拒绝策略:
  • AbortPolicy :直接抛出异常,默认;
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务
  • DiscardPolicy :当前任务直接丢弃
线程池工作机制图:
线程池运行机制:

当程序生产了任务之后,提交进线程池。线程池会创建出corePoolSize数量的线程执行任务。如果任务数超出了corePoolSize,那么阻塞队列派上用场了,会将任务放入到阻塞队列中。如果任务数继续增大,阻塞队列也满了,这个时候才会继续新起线程执行任务。而如果线程总数达到了maximumPoolSize,那么这个时候饱和策略就会执行了。

ExecutorService类结构:

submit:向线程池提交任务
shutdown:尝试关闭线程池,将当前没有执行任务的线程中断
shutdownNow:将当前线程所有线程立即进行中断

线程池提交任务的方式

1.excute(runnable)方法提交,提交的任务不管返回结果
2.submit(runnable,callback)方法提交,提交的任务可以得到任务执行完成的返回结果。

线程池如何配置合理线程数

(1)CPU密集型:
定义:CPU密集型的意思就是该任务需要大量运算,而没有阻塞,CPU一直全速运行。
CPU密集型任务配置尽可能少的线程数,因为线程上下文切换会耗费更多的时间。
配置线程数:CPU核数+1

(2)IO密集型:
定义:IO密集型,即该任务需要大量的IO,即大量的阻塞。
在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。
所以IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要利用了被浪费掉的阻塞时间。
配置线程数:CPU核数 * 2

CPU核心数获取方法:

 Runtime.getRuntime().availableProcessors()

各种线程池的创建方式:

1.创建缓存线程池:

        //创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

内部实现方式:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.创建单线程池:

 //创建一个线程的线程池,保证任务执行顺序和添加任务顺序一致
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

内部实现方式:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.创建定长线程池

//创建定长线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

内部实现方式:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

4.创建周期线程池

 //创建一个定长线程池,定时及周期性任务执行
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);

内部实现方式:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
使用线程池执行任务:
        threadPool.execute(new Runnable() {
            @Override
            public void run() {

            }
        });
使用线程池关闭任务:

原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程

        cachedThreadPool.shutdown();

        cachedThreadPool.shutdownNow();

验证各种线程池特性:

创建执行的任务:任务就是打印各自的"类路径+@+hashcode"。

    public static class MyRunable implements Runnable {

        public MyRunable(){

        }

        @Override
        public void run() {
            System.out.println("开始处理任务");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("当前执行任务线程" + this.toString());
        }
    }

1.循环创建任务放入CachedThreadPool中执行:

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

        for(int i=0;i<10;i++){
            cachedThreadPool.execute(new MyRunable());
        }

打印结果:给多少个任务,就创建多少个线程,缓存线程池不限制线程大小

开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@4ee88e47
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@51dbc572
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@6350da9f
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@34258bc7
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@2dd943ad
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@786a52e0
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@b89fbe7
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@834fe66
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@52922937
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@3d339f20

2.循环创建任务放入SingleThreadPool中执行:

        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

        for(int i=0;i<10;i++){
            singleThreadPool.execute(new MyRunable());
        }

当前只有一个线程处理任务,任务按照加入顺序依序执行:

开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@35b893c9
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@3b7f6b0e
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@3f40f8b9
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@7a426135
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@b95836f
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@4bf2be08
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@73cf02ff
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@1a6d9e7a
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@3d44e8a3
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@639803fd

3.循环创建任务放入FixedThreadPool中执行:
定长线程池,设置最大线程数为3;

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for(int i=0;i<10;i++){
            fixedThreadPool.execute(new MyRunable());
        }

最多能有3个线程同时处理任务:

开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@f1c2027
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@464fea81
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@72e3cfe6
开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@48881c38
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@7c242fca
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@1dbcfb4
开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@4cb781f1
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@5e44b57
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@781ba496
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@9b51603

4.循环创建任务放入ScheduledThreadPool中执行:

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        for(int i=0;i<10;i++){
            scheduledExecutorService.schedule(new MyRunable(),4, TimeUnit.SECONDS);
        }

也是需要固定线程大小,不过比fixed多了延迟执行。

开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@2fec69f9
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@1188fc84
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@7b1e0a3d
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@1f4f00c5
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@75d9fb2e
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@2bd3e8dc
开始处理任务
开始处理任务
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@71e6614d
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@1b7097f4
开始处理任务
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@6ed163c2
当前执行任务线程com.example.apple.threadpooluse.MainActivity$MyRunable@34e4f115
这么多种线程池,如何合理地配置线程池?

根据任务特性:

  • CPU密集型:
    (1)高并发、任务执行时间短的业务,如纯粹的计算,线程池线程数可以设置为CPU核数+1,不能再多了,能减少线程上下文的切换
Runtime.getRuntime().availableProcessors()   //获取CPU核心数
  • IO密集型:
    并发不高、任务执行时间长的业务,如IO操作,网络请求,线程容易停下来,不要让所有的CPU闲下来,可以适当加大线程池中的线程数目,让CPU处理更多的业务
    推荐配置线程数不要超过整体的CPU核心数的2倍。

有关线程池的机制的更多相关文章

  1. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  2. ruby - 如何让Ruby捕获线程中的语法错误 - 2

    我正在尝试使用ruby​​编写一个双线程客户端,一个线程从套接字读取数据并将其打印出来,另一个线程读取本地数据并将其发送到远程服务器。我发现的问题是Ruby似乎无法捕获线程内的错误,这是一个示例:#!/usr/bin/rubyThread.new{loop{$stdout.puts"hi"abc.putsefsleep1}}loop{sleep1}显然,如果我在线程外键入abc.putsef,代码将永远不会运行,因为Ruby将报告“undefinedvariableabc”。但是,如果它在一个线程内,则没有错误报告。我的问题是,如何让Ruby捕获这样的错误?或者至少,报告线程中的错误?

  3. ruby - 如何在 ruby​​ 中运行后台线程? - 2

    我是ruby​​的新手,我认为重新构建一个我用C#编写的简单聊天程序是个好主意。我正在使用Ruby2.0.0MRI(Matz的Ruby实现)。问题是我想在服务器运行时为简单的服务器命令提供I/O。这是从示例中获取的服务器。我添加了使用gets()获取输入的命令方法。我希望此方法在后台作为线程运行,但该线程正在阻塞另一个线程。require'socket'#Getsocketsfromstdlibserver=TCPServer.open(2000)#Sockettolistenonport2000defcommandsx=1whilex==1exitProgram=gets.chomp

  4. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  5. ruby - Ruby 1.9.1 中的 native 线程,对我有什么好处? - 2

    所以,Ruby1.9.1现在是declaredstable.Rails应该与它一起工作,并且正在慢慢地将gem移植到它。它具有native线程和全局解释器锁(GIL)。自从GIL到位后,原生线程是否比1.9.1中的绿色线程有任何优势? 最佳答案 1.9中的线程是原生的,但它们被“放慢了速度”,一次只允许一个线程运行。这是因为如果线程真的并行运行,它会混淆现有代码。优点:IO现在在线程中是异步的。如果一个线程阻塞在IO上,那么另一个线程将继续执行直到IO完成。C扩展可以使用真正的线程。缺点:任何非线程安全的C扩展都可能存在使用Thre

  6. ruby - 使写入文件线程安全 - 2

    我在一个ruby​​文件中有一个函数可以像这样写入一个文件File.open("myfile",'a'){|f|f.puts("#{sometext}")}这个函数在不同的线程中被调用,使得像上面这样的文件写入不是线程安全的。有谁知道如何以最简单的方式使这个文件写入线程安全?更多信息:如果重要的话,我正在使用rspec框架。 最佳答案 您可以通过File#flock给锁File.open("myfile",'a'){|f|f.flock(File::LOCK_EX)f.puts("#{sometext}")}

  7. Ruby 线程与 Watir - 2

    我编写了几个类来控制我想如何处理多个网站,两者都使用类似的方法(即登录、刷新)。每个类都打开自己的WATIR浏览器实例。classSite1definitialize@ie=Watir::Browser.newenddeflogin@ie.goto"www.blah.com"endend无线程的main中的代码示例如下require'watir'require_relative'site1'agents=[]agents这工作正常,但在当前代理完成登录之前不会移动到下一个代理。我想合并多线程来处理这个问题,但似乎无法让它工作。require'watir'require_relative

  8. ruby - 在多个线程中引用类方法会导致自动加载循环依赖崩溃 - 2

    代码:threads=[]Thread.abort_on_exception=truebegin#throwexceptionsinthreadssowecanseethemthreadseputs"EXCEPTION:#{e.inspect}"puts"MESSAGE:#{e.message}"end崩溃:.rvm/gems/ruby-2.1.3@req/gems/activesupport-4.1.5/lib/active_support/dependencies.rb:478:inload_missing_constant':自动加载常量MyClass时检测到循环依赖稍加研究后,

  9. Ruby 多线程/多处理读物 - 2

    任何人都可以推荐任何详细介绍Ruby多线程/多处理的复杂性的好的多线程/处理书籍/网站吗?我尝试使用ruby​​线程,基本上在1.9vm上的无死锁代码中它在jruby中遇到了死锁。是的,我意识到差异很大(jruby没有GIL),但我想知道是否有用于ruby​​中多线程编程的策略或类集,我只需要继续阅读。旁注:从java到ruby​​必须定义是否需要重新输入锁,这有点奇怪。 最佳答案 如果你使用Ruby1.9,你可以试试Fiber,它是Ruby中线程的一大改进http://ruby-doc.org/core-1.9/classes/F

  10. ruby - 跨线程共享枚举器 - 2

    我想从不同线程调用一个公共(public)枚举器。当我执行以下操作时,enum=(0..1000).to_enumt1=Thread.newdopenum.nextsleep(1)endt2=Thread.newdopenum.nextsleep(1)endt1.joint2.join它引发了一个错误:Fibercalledacrossthreads.当enum在从t1调用一次后从t2调用时。为什么Ruby设计为不允许跨线程调用枚举器(或纤程),以及是否有其他方法可以提供类似的功能?我猜测枚举器/纤程上的操作的原子性在这里是相关的,但我不完全确定。如果这是问题所在,那么在使用时独占锁定

随机推荐