java线程池技术二:ThreadPoolExecutor实现原理

Published on 2025-01-26 23:54 in 分类: 博客 with 狂盗一枝梅
分类: 博客

本篇文章将讲解ThreadPoolExecutor线程池,它是Executors类newFixedThreadPool方法以及newCachedThreadPool方法直接创建的线程池。

线程池-线程池类图.drawio

一、线程池的标准创建方式

大部分企业的开发规范都会禁止使用快捷线程池,要求通过标准构造器ThreadPoolExecutor去构造工作线程池,原因在上一节已经讲过,每种线程池都有自己的问题,没有限制长度的阻塞队列存在将系统资源耗尽的风险。Executors工厂类中创建线程池的newFixedThreadPool方法以及newCachedThreadPool方法实际上是调用ThreadPoolExecutor线程池的构造方法完成的。ThreadPoolExecutor构造方法有多个重载版本,最终,所有的构造器都会调用下面这个大而全的构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造方法有7个参数,虽然参数很多,但是都是有用的:

  • corePoolSize:核心线程数量,不管是否空闲,核心线程始终保持存活状态,除非设置了allowCoreThreadTimeOut,设置了该值为true以后,keepAliveTime策略将也适用于核心线程。

  • maximumPoolSize:线程池中允许存在的最大线程数量

  • keepAliveTime:最大存活时间,超出corePoolSize的线程在未接到新任务前,最长等待多少时间;超出这个时间还是空闲,线程将会被回收

  • unit:最大存活时间的时间单位

  • workQueue:提交新任务后,如果线程池核心线程都是非空闲状态,新任务将在workQueue队列中等待;等待队列已满的时候如果线程数量未达到maximumPoolSize则创建新线程执行任务。

  • threadFactory:创建线程的工厂

  • handler:当workQueue队列已满,而且线程池中线程中线程数量已经达到了maximumPoolSize,线程池将无法处理新提交的任务,handler即为拒绝策略。

1、流程图

创建标准线程池的参数很多,下面使用流程图更加清晰地说明各个参数的作用

image-20250122162804601

2、阻塞队列

阻塞队列参数类型是BlockingQueue,它之所以被称为“阻塞”队列,正是因为阻塞队列与普通队列相比有一个重要的特点:在阻塞队列为空时会阻塞当前线程的元素获取操作。具体来说,在一个线程从一个空的阻塞队列中获取元素时线程会被阻塞,直到阻塞队列中有了元素;当队列中有元素后,被阻塞的线程会自动被唤醒。

Java线程池使用BlockingQueue实例暂时接收到的异步任务,BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:

(1)ArrayBlockingQueue:是一个数组实现的有界阻塞队列(有界队列),队列中的元素按FIFO排序。ArrayBlockingQueue在创建时必须设置大小,接收的任务超出corePoolSize数量时,任务被缓存到该阻塞队列中,任务缓存的数量只能为创建时设置的大小,若该阻塞队列已满,则会为新的任务创建线程,直到线程池中的线程总数大于maximumPoolSize。

(2)LinkedBlockingQueue:是一个基于链表实现的阻塞队列,按FIFO排序任务,可以设置容量(有界队列),不设置容量则默认使用Integer.Max_VALUE作为容量(无界队列)。该队列的吞吐量高于ArrayBlockingQueue。

如果不设置LinkedBlockingQueue的容量(无界队列),当接收的任务数量超出corePoolSize时,则新任务可以被无限制地缓存到该阻塞队列中,直到资源耗尽。有两个快捷创建线程池的工厂方法Executors.newSingleThreadExecutor和Executors.newFixedThreadPool使用了这个队列,并且都没有设置容量(无界队列)。

(3)PriorityBlockingQueue:是具有优先级的无界队列。

(4)DelayQueue:这是一个无界阻塞延迟队列,底层基于PriorityBlockingQueue实现,队列中每个元素都有过期时间,当从队列获取元素(元素出队)时,只有已经过期的元素才会出队,队列头部的元素是过期最快的元素。快捷工厂方法Executors.newScheduledThreadPool所创建的线程池使用此队列。

(5)SynchronousQueue:(同步队列)是一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程的调用移除操作,否则插入操作一直处于阻塞状态,其吞吐量通常高于LinkedBlockingQueue。快捷工厂方法Executors.newCachedThreadPool所创建的线程池使用此队列。与前面的队列相比,这个队列比较特殊,它不会保存提交的任务,而是直接新建一个线程来执行新来的任务。

3、线程工厂

ThreadFactory是Java线程工厂接口,这是一个非常简单的接口,具体如下:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

在某些构造方法中没有指定该构造方法,实际上会使用默认的线程工厂类:DefaultThreadFactory

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
		//线程名格式被设置为pool-{poolNum}-thread-{threadNum}的形式
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

要想自定义一个线程工厂,只需要实现ThreadFactory接口中的newThread方法就好。

4、拒绝策略

在线程池的任务缓存队列为有界队列(有容量限制的队列)的时候,如果队列满了,提交任务到线程池的时候就会被拒绝。总体来说,任务被拒绝有两种情况:

(1)线程池已经被关闭。

(2)工作队列已满且maximumPoolSize已满。

无论以上哪种情况任务被拒绝,线程池都会调用RejectedExecutionHandler实例的rejectedExecution方法。RejectedExecutionHandler是拒绝策略的接口,JUC为该接口提供了以下几种实现:

  • AbortPolicy:拒绝策略。

  • DiscardPolicy:抛弃策略。

  • DiscardOldestPolicy:抛弃最老任务策略。

  • CallerRunsPolicy:调用者执行策略。

这四个类指定定义在ThreadPoolExecutor内部:

image-20250122170320162

除此之外,还可以自定义拒绝策略。

(1)AbortPolicy

使用该策略时,如果线程池队列满了,新任务就会被拒绝,并且抛出RejectedExecutionException异常。该策略是线程池默认的拒绝策略。

public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

可以看到其处理方式非常简单,就是直接抛出异常。

(2)DiscardPolicy

该策略是AbortPolicy的Silent(安静)版本,如果线程池队列满了,新任务就会直接被丢掉,并且不会有任何异常抛出。

    public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

从DiscardPolicy源码中可以看到,其拒绝策略是空方法,所以不会做处理而且不会抛出异常

(3)DiscardOldestPolicy

抛弃最老任务策略,也就是说如果队列满了,就会将最早进入队列的任务抛弃,从队列中腾出空间,再尝试加入队列。因为队列是队尾进队头出,队头元素是最老的,所以每次都是移除队头元素后再尝试入队。

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            //移除等待队列的头部节点
            e.getQueue().poll();
            //重新尝试提交任务
            e.execute(r);
        }
    }
}

(4)CallerRunsPolicy

调用者执行策略。在新任务被添加到线程池时,如果添加失败,那么提交任务线程会自己去执行该任务,不会使用线程池中的线程去执行新任务。

在以上4种内置策略中,线程池默认的拒绝策略为AbortPolicy,如果提交的任务被拒绝,线程池就会抛出RejectedExecutionException异常,该异常是非受检异常(运行时异常),很容易忘记捕获。如果关心任务被拒绝的事件,需要在提交任务时捕获RejectedExecutionException异常。

public static class CallerRunsPolicy implements RejectedExecutionHandler {

    public CallerRunsPolicy() { }

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            //任务调用方直接调用run方法执行任务
            r.run();
        }
    }
}

5、线程池的状态转化

ThreadPoolExecutor内部维护着以下几种线程池状态:

private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

第一眼看上去,这个状态值就很让人费解,每个状态都是向左位移COUNT_BITS得到的值,这是为什么呢?

线程池状态实际上保存在了int类型的高位前三个bit,我们知道三个bit能够表示2^3共8个数,而如果两个bit则只能表示2^2是4个数,所以需要三个bit存储五种状态,这也是为什么COUNT_BITS = Integer.SIZE - 3,就是为了留出来3个bit用来存储状态值。

那剩下的29bit干啥了?实际上是用于存储对应状态的任务数量,这个之后再谈。

RUNNING的状态比较特殊,它使用了-1左移COUNT_BITS,这样能够保证最高位是1,而其他四种状态最高位都是0,总结下来,最高位的三个bit值和五种线程池状态的对应关系如下所示:

线程池状态 十进制数 最高位3bit
RUNNING -1 << 29 111
SHUTDOWN 0 << 29 000
STOP 1<< 29 001
TIDYING 2<< 29 010
TERMINATED 3<< 29 011

线程池创建完成之后的初始状态是RUNNING,最终状态是TERMINATED,五个线程池状态的状态转化如下图所示:

image-20250123105805145

6、线程池的关闭

关闭线程池可以使用两个方法:shutdown和shutdownNow,但是这两个方法有区别

shutdown:是JUC提供的一个有序关闭线程池的方法,此方法会等待当前工作队列中的剩余任务全部执行完成之后,才会执行关闭,但是此方法被调用之后线程池的状态转为SHUTDOWN,线程池不会再接收新的任务。

shutdownNow:是JUC提供的一个立即关闭线程池的方法,此方法会打断正在执行的工作线程,并且会清空当前工作队列中的剩余任务,返回的是尚未执行的任务。

awaitTermination:该方法并非关闭线程池的方法,而是等待线程池彻底关闭的方法,它会阻塞当前线程等待线程池关闭,等线程池关闭之后会唤醒当前线程继续执行后续代码。该方法通常在调用shutdown或者shutdownNow方法后被调用。

7、钩子方法

ThreadPoolExecutor线程池调度器为每个任务执行前后都提供了钩子方法。ThreadPoolExecutor类提供了三个钩子方法(空方法),这三个钩子方法一般用作被子类重写,具体如下:

//任务执行之前的钩子方法(前钩子)
 protected void beforeExecute(Thread t, Runnable r)   { }
 //任务执行之后的钩子方法(后钩子)
 protected void afterExecute(Runnable r, Throwable t) { }
 //线程池终止时的钩子方法(停止钩子)
 protected void terminated() { }

(1)beforeExecute:异步任务执行之前的钩子方法

线程池工作线程在异步执行目标实例(如Runnable实例)前调用此钩子方法。此方法仍然由执行任务的工作线程调用。默认实现不执行任何操作,但可以在子类中对其进行自定义。

此方法由执行目标实例的工作线程调用,可用于重新初始化ThreadLocal线程本地变量实例、更新日志记录、开始计时统计、更新上下文变量等。

(2)afterExecute:异步任务执行之后的钩子方法

线程池工作线程在异步执行目标实例后调用此钩子方法。此方法仍然由执行任务的工作线程调用。此钩子方法的默认实现不执行任何操作,可以在调度器子类中对其进行自定义。

此方法由执行目标实例的工作线程调用,可用于清除ThreadLocal线程本地变量、更新日志记录、收集统计信息、更新上下文变量等。

(3)terminated:线程池终止时的钩子方法

terminated钩子方法在Executor终止时调用,默认实现不执行任何操作。

可以通过继承ThreadPoolExecutor类重写该三个方法以实现日志记录、时间统计等功能。

二、ThreadPoolExecutor线程池实现原理

研究ThreadPoolExecutor源码,我的思路就是顺着调用链走,顺藤摸瓜。说到底,就一个方法,submit方法。

1、任务提交:submit方法

submit方法实际上是AbstractExecutorService的方法

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //调用ThreadPoolExecutor实现的钩子方法
    execute(ftask);
    return ftask;
}

这里对task封装了一下,就将其交给了execute方法,execute方法是Executor顶级接口定义的核心方法

public interface Executor {
    void execute(Runnable command);
}

实际上是个钩子方法,在这里由ThreadPoolExecutor实现,这体现了模板方法模式的运用。

2、任务执行:execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取ctl值
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//解析ctl值中的worker数量,如果小于核心线程数量,则尝试添加worker
        //尝试添加worker,如果成功就结束方法
        if (addWorker(command, true))
            return;
        //如果没成功,重新获取ctl值
        c = ctl.get();
    }
    /**
     * 有两种可能性程序执行到这里
     *   1. 当前工作线程数量>=核心线程数量,那么任务要直接去排队
     *   2. addWorker添加工作线程执行失败
     * 如果线程池是在运行中状态,就将任务直接插入任务队列的尾部
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /**
     * 走到这里大概率是插入队列尾部失败的原因,这说明任务队列已满,需要尝试
     * 新增非核心工作线程解决
     */
    else if (!addWorker(command, false))
        //如果队列已满而且工作线程已经达到了最大线程数量,就执行拒绝策略
        reject(command);
}

这个方法很短,但是实际上比较复杂,它的绝大多数代码都在子方法中,上来当头一棒的就是ctl变量,ctl变量是个什么东西?

ctl常量

ctl变量确切的应该叫做“ctl常量”,它定义在ThreadPoolExecutor类中,用于存储线程池状态以及对应状态下的任务数量。

//位移数量
private static final int COUNT_BITS = Integer.SIZE - 3;
//运行中状态值
private static final int RUNNING    = -1 << COUNT_BITS;
//ctl是"control"的缩写,ctl被用作一个控制变量,它用于存储状态以及对应状态下的任务数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//计算对应状态的数量
private static int ctlOf(int rs, int wc) {
    return rs | wc; 
}

ctl常量是AtomicInteger类型的,它用于存储状态以及对应状态下的任务数量,之前已经提到过,RUNNING状态存储在了int类型数值的最高三位,现在ctl数值使用 RUNNING | wc 或计算的结果初始化,实际上就是将剩余的29bit用来存储任务数量了:

image-20250123152148505

知道了ctl常量的作用,接下来继续看execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取ctl值
    int c = ctl.get();
    //workerCountOf方法?
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

workerCountOf方法

workerCountOf方法用于提取出ctl值中的任务数量。

private static int workerCountOf(int c)  {
    return c & CAPACITY; 
}

这个方法中用到了一个新常量CAPACITY

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

CAPACITY常量实际上代表的是剩余29bit能存储的最大正整数值,其二进制表示如下所示

image-20250123153408877

拿它和ctl常量的值做与运算,实际上就是截取了ctl值后29bit的值,也就是获取了RUNNING状态下的任务数量。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取ctl值
    int c = ctl.get();
    //获取RUNNING状态下的任务数量并和corePoolSize做比较
    if (workerCountOf(c) < corePoolSize) {
        //满足前置if表示核心线程数不足corePoolSize,走addWork逻辑创建新线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

if (workerCountOf(c) < corePoolSize) 代码逻辑就清楚了然了,就是为了检查线程数量是否小于corePoolSize,如果满足条件,表示核心线程数量还没到corePoolSize,需要创建新线程。

接下来看addWorker方法逻辑。

addWorker

addWorker方法简单来说就是创建线程的方法,并附带线程将要执行的第一个任务firstTask;core方法参数是一个标记,为true的时候表示创建的是核心线程,为false的时候表示核心线程数量已经到了corePoolSize,创建的是核心线程之外但是没到maximumPoolSize的线程;线程创建成功之后将会启动线程。

addWorker方法是个又臭又长的方法,而且里面还使用了Java中不推荐使用的带标签的控制语句,用于控制外部循环的执行,总之有些复杂。

private boolean addWorker(Runnable firstTask, boolean core) {
    //定义retry标签,方便内部循环控制外部循环
    retry:
    //第一层循环,用于检测线程状态,及时发现线程状态变化
    for (;;) {
        //获取ctl的值
        int c = ctl.get();
        //根据ctl提取出对应的状态值
        int rs = runStateOf(c);
        if (rs >= SHUTDOWN &&  //检查是非RUNNNIG状态
            ! (//排除掉非RUNNING状态下,线程池是SHUTDOWN状态,第一个任务为空,但是任务队列不为空的情况;这种情况下说明线程池调用了shutdown()方法,但是队列中还有任务,同时提交过来的firstTask为null表示并没有提交新任务,为了消化掉等待队列中的任务,需要创建新线程
                rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()
              )
           )
            return false;

        //第二层循环,主要是防止CAS修改任务数量失败的情况,for循环确保CAS成功
        for (;;) {
            //查询对应线程池状态下的任务数量
            int wc = workerCountOf(c);
            if (wc >= CAPACITY || //线程数量不可大于CAPACITY,即int类型的后29bit能表示的最大正整数
                wc >= (core ? corePoolSize : maximumPoolSize))//根据core值是true还是false,动态判断是否应该小于corePoolSize还是应该小于maximumPoolSize
                return false;
            
            //CAS自增1,成功了就结束外部循环:表示任务数量已经增加了1(实际上还没有增加到等待队列,失败后会回滚)
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();
            //重新检查线程池状态,如果线程池状态发生了变化,就要走外部循环重新检测当前线程池状态是否满足创建线程的条件
            if (runStateOf(c) != rs)
                continue retry;
            //其它情况就是CAS失败,内循环重试CAS即可
        }
    }

    //走到这里就表示任务数量已经自增1成功了,接下来要创建对应的线程了
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建Worker,同时创建对应的线程,关于Worker,之后再详细分析
        w = new Worker(firstTask);
        //获取到刚创建的线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            //由于会访问到workers(HashMap类型),所以要加锁访问
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
				//为了防止lock方法执行前线程池突然关闭,需要重新检测线程池状态
                if (rs < SHUTDOWN ||  //线程池状态RUNNING符合条件
                    (rs == SHUTDOWN && firstTask == null)//线程池为关闭状态,但是firstTask为null也满足创建条件
                   ) {
                    if (t.isAlive()) //线程刚创建,还没有调用start方法,如果isAlive方法返回true表示Thread状态不对,需要抛出异常
                        throw new IllegalThreadStateException();
                    //添加到workers集合,至此,该方法任务已经完成了一多半
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                //解锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这个方法有些长,将它分为上下两部分看比较好:上半部分代码是两处for循环,其最终目的就只有一个,那就是将ctl值自增1;下半部分则是ctl自增1之后的事情,ctl自增1表示线程数多了一个,需要创建该线程,并启动该线程,这里的“线程”实际上是Worker,Worker是线程池中的重中之重,Worker是什么,它有什么作用?接下来仔细分析该类以及后续的代码流程。

三、线程池实现的核心:Worker类

Worker类可以认为它代表着线程池中的一个线程,它不代表具体的线程池提交的任务,它代表着“运行线程池任务的任务”。

Worker类实现了实现了Runnable接口,另外还继承了AQS类,实现了非重入独占锁功能。

image-20250124162443770

它有三个成员变量:

thread:存储着执行该Worker的线程

firstTask:用户提交的第一个任务,构造方法传入,可以为null

completedTasks:计数器,完成了多少个用户提交的任务

1、线程可复用的原理

做java开发用到线程池的时候肯定会忍不住想,线程池中的线程为什么能保持不死,而且还能持续不断地执行后续任务?毕竟我们定义线程执行完任务就自己就挂了,那线程池线程可复用的原理是什么?这Worker类是关键。

下面我画了一张线程池中各个组件和类的关系,Worker在其中扮演的角色一目了然

image-20250124153329510

Worker类中维护了一个Thead实例,Thread实例运行的真正的Runnable是Worker本身(Worker实现了Runnable接口),Worker的任务就是先执行firstTask任务,执行完之后不断地从workQueue队列中取任务,然后运行run方法执行;当workQueue中没有任务的时候,获取的线程就会阻塞(workQueue是BlockQueue),等队列中有值的时候就会唤醒阻塞的线程继续取任务。

以上就是ThreadPoolExecutor运行的基本原理,线程要么在执行任务,要么在在阻塞等待取任务,所以它永远不会挂掉,这就实现了可复用。当然,这指的是核心线程,超出corePoolSize创建的非核心线程,则在阻塞获取任务超时之后,线程停止运行,对应的worker被从workers集合中移除。

核心线程如果设置了allowCoreThreadTimeOut标记为true,也会执行和非核心线程一样的逻辑,超时就被回收。

2、构造方法:防止中断的处理

在addWorker方法中创建了Worker实例

w = new Worker(firstTask);

它的源码如下所示

Worker(Runnable firstTask) {
    setState(-1); // 设置为AQS state为-1,在runWorker方法运行前禁止发生中断
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

在构造方法中,创建了新线程,而且将AQS的state值成了-1,从源码注释上来看,是为了在运行runWorker方法前防止发生中断,这怎么防止呢?

我们来看下加锁和释放锁的方法

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

和ReentrantLock相比,没有了重入锁相关的考虑,代码更简洁了;从代码上来看,1是上锁的状态,0是释放锁的状态,再也没有了其它state状态。

那和将state值改为-1能防止发生中断有什么关系呢?其实没啥关系,嘿嘿。防止发生的中断主要是防止线程池调用了shutdownNow方法导致的中断:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        //在这里会尝试中断所有正在执行任务的线程
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();//在这里调用Worker的interruptIfStarted方法尝试中断线程
    } finally {
        mainLock.unlock();
    }
}

void interruptIfStarted() {
    Thread t;
    //在这里会判定state的状态是否大于等于0,如果设置成-1,线程将不会被中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

3、执行任务:runWorker方法

查看Worker的run方法,它调用了ThreadPoolExecutor的runWork方法

public void run() {
    runWorker(this);
}

runWorker方法是真正执行任务的方法,它会先尝试执行firstTask任务,如果firstTask任务为空则从任务队列中取任务,如果任务队列为空,则线程会被阻塞直到队列中有任务被唤醒。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); //之前在构造方法中设置了AQS state值为-1,现在通过unlock方法将state值改成了初始值0,这样就允许中断了
    boolean completedAbruptly = true;
    try {
        while (task != null || //由于上次完成任务后task置为null,该task为null;不为null的情况只有初次运行时firstTask的情况 
               (task = getTask()) != null//获取下一个task,如果后续没有任务,将会被阻塞
              ) {
            //执行任务时加锁
            w.lock();
            /**
             * 这里的if判断比较有意思
             * 1. 如果调用了shutdownNow方法导致线程池状态变成了STOP以及STOP之后的状态,
             *	若没有设置线程中断标记则设置线程中断标记
           	 * 2. 如果线程发生了中断,先取消中断状态
           	 *		如果调用了shutdownNow方法,同上处理shutdownNow方法的逻辑
           	 *      其它情况,不作处理,也就是说如果没有调用shutdownNow方法,而且线程发生了中断,就将中断取消
           	 * 这个if判断总结下来做了两件事情:
           	 * 1. 如果线程池被关闭,确保把线程设置中断标记
           	 * 2. 如果线程池没有被关闭,确保线程没有设置中断标记
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //在执行前先调用beforeExecute钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //执行任务run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //将发生的异常传递给afterExecute钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                //完成任务后将当前task任务置为null
                task = null;
                w.completedTasks++;
                //执行完任务释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        //线程结束后执行Work回收逻辑
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法中的while循环会通过getTask()方法调用获取下一个要执行的任务,接下来看看getTask方法的执行逻辑。

4、获取下一个任务:getTask方法

getTask方法会从workQueue任务队列中取任务,如果任务队列为空,则会阻塞;同时该方法还会修改等待超时的非核心线程数量或者设置了allowCoreThreadTimeOut标志位的等待超时的核心线程数量。

private Runnable getTask() {
    //等待超时标记:等待队列为空的时候会阻塞,如果设置了超时时间,则可能会等待超时
    boolean timedOut = false;
    for (;;) {
        //获取ctl值
        int c = ctl.get();
        //解析ctl值,获取线程池状态
        int rs = runStateOf(c);
        if (
            rs >= SHUTDOWN //如果线程池是关闭状态
            && (
                rs >= STOP //如果调用了shutdownNow方法,剩余队列中的任务就不再处理了
                || workQueue.isEmpty()//如果剩余队列任务是空,也不需要再做处理
               )
           ) {
            //work数量自减
            decrementWorkerCount();
            //返回null表示未获取到下一个任务,这将结束runWorker方法中的循环
            return null;
        }

        //解析ctl值,获取运行中的worker数量(wc即为worker count的意思)
        int wc = workerCountOf(c);
        
        /**
         * timed变量将会影响当前等待超时的worker是否应该被回收,它的值受两个因素的影响:
         *		allowCoreThreadTimeOut标志:如果为true,核心线程将会像非核心线程一样被回收
         *		如果未设置allowCoreThreadTimeOut标志,是否需要回收则取决于是否是核心线程
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
		
        /**
         * worker数量
         *
         *
         */
        if (
                (
                    wc > maximumPoolSize //如果worker数量已经大于最大线程数量,则几乎必定会将当前worker回收
                    || 
                 	(timed && timedOut)//如果time标记为true,而且发生了等待超时,则几乎必定会将当前worker回收
                )
                && 
                (
                    wc > 1 
                    || 
                    workQueue.isEmpty()
                )
           ) {
            if (compareAndDecrementWorkerCount(c))//如果CAS自减成功,就返回null
                return null;
            continue;//如果CAS失败,则下次for循环再重试
        }

        //运行到这里表示当前worker还不满足需要被回收的条件
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://带有超时等待机制的获取workQueue队列中的元素
                workQueue.take();//直接获取workQueue队列中的元素,workQueue队列为空则无限期阻塞等待
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask方法的目的是获取队列中的任务,如果返回null表示获取失败,则会终止Worker run方法中的循环,之后该Worker将会执行processWorkerExit逻辑。

5、woker线程退出:processWorkerExit

worker循环终止,则线程即将挂掉,挂掉之前需要执行processWorkerExit方法做一些善后工作

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    /**
     * completedAbruptly标记记录了是否是任务方法自己发生的异常导致的循环退出
     *		1. 如果是执行任务时发生的异常,则必定ctl值没有发生自减,需要在这里自减
     *		2. 如果不是执行任务时发生的额异常,则必定是getTask方法未获取到任务导致的退出,
     *		   getTask已经将ctl值自减,就不需要再执行自减了
     */
    if (completedAbruptly)
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    //操作works移除前,需要加锁
    mainLock.lock();
    try {
        //完成任务的总数量增加
        completedTaskCount += w.completedTasks;
        //works集合移除指定任务
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	
    //尝试彻底关闭线程池,让其变成终态TERMINATED	
    tryTerminate();
	
	//获取ctl值
    int c = ctl.get();
    //TODO
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; 
        }
        addWorker(null, false);
    }
}

上述代码中唯一有疑问的是最后一段if代码块

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; // replacement not needed
    }
    addWorker(null, false);
}

这段代码竟然重新添加了新Worker,这是为什么?明明这个方法就是为了将当前Worker移除掉。

其实,这段代码的真正意图是防止线程池还没彻底关闭的情况下(STOP状态之前),等待队列中还有任务,但是线程池中已经没有可用线程的情况,这样等待队列中的任务将无人处理了。

我将代码加上注释,如下所示

//获取ctl值
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//只有线程池状态小于STOP状态才允许添加新Worker替换挂掉的当前Worker
    if (!completedAbruptly) {
        int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
        if (min == 0 && ! workQueue.isEmpty())
            min = 1;
        if (workerCountOf(c) >= min)
            return; 
    }
    /**
     * 两种情况可能会执行addWorker
     * 	1. completedAbruptly为true,表示当前工作线程是因为执行任务期间被异常中断的,这种情况下新建线程恢复之前的运行状态
     *  2. completedAbruptly为false,表示当前工作线程不是在执行任务期间被异常中断的,这种情况要考虑当前worker挂掉之后
     *     核心线程数量是否足够的问题;如果设置了allowCoreThreadTimeOut为true,那理论上允许的最小核心线程数量是0,如果
     *     这时候任务队列不为空,则允许的最小核心线程就变成1了,因为毕竟要有线程去处理队列中的任务
     *  以上两种情况都需要重新创建Worker替换掉挂掉的worker
     */
    addWorker(null, false);
}

这段if块也充分说明了线程池SHUTDOWN状态下也不会放弃任务队列中的任务。

到这里,addWorker方法就分析的差不多了。再看之前的execute方法是不是就有感觉了?

四、线程池生命周期

之前已经说过线程池的状态转化

image-20250123105805145

在上两节文章中已经分析了execute方法,实际上是RUNNING状态下的线程池提交并执行任务的逻辑,接下来分析线程池其它生命周期中的细节。

1、柔和关闭线程池:shutdown方法

shutdown方法调用之后线程池将变成SHUTDOWN状态,但是会将任务队列中的任务全部执行完才会彻底关闭线程池。其源码如下

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();//检查当前线程是否有关闭线程的权限
        advanceRunState(SHUTDOWN);//将线程池的状态修改为SHUTDOWN
        interruptIdleWorkers();//尝试中断所有空闲Worker线程
        onShutdown(); // ScheduledThreadPoolExecutor专用钩子方法
    } finally {
        mainLock.unlock();
    }
    //尝试关闭线程池
    tryTerminate();
}

在这个方法中,需要关注下interruptIdleWorkers方法以及tryTerminate方法。

interruptIdleWorkers:尝试中断所有空闲Worker线程

tryTerminate:尝试关闭线程池,该方法是关闭线程池的最终方法。

中断空闲线程:interruptIdleWorkers方法

interruptIdleWorkers方法是中断“空闲”线程的方法,顾名思义,如果线程是非空闲状态,它不会去尝试中断,因为SHUTDOWN状态下所有任务都应当正常被处理完毕。

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

/**
 * onlyOne这个参数很特殊,如果它被设置为true,interruptIdleWorkers将会最多中断一个线程,
 * 如果是interruptIdleWorkers()方法调用,则会中断所有空闲的工作线程,这是为什么呢?待会儿分析
 * tryTerminate再说,onlyOne参数为true的情况只会在tryTerminate方法中出现。
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //遍历所有工作线程
        for (Worker w : workers) {
            Thread t = w.thread;
            //如果能tryLock成功,就说明线程空闲
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //设置中断标记
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            //如果设置了onlyOne标记,则只中断一个线程就返回
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers()方法会尝试中断所有空闲的线程,判断空闲的办法就是tryLock能否成功,因为从之前的runWorker方法中可以看到,整个任务执行的过程中都加了独占锁,如果工作线程在执行任务,那interruptIdleWorkers中的tryLock必然会失败;反之说明工作线程空闲。

由于线程空闲,所以它必定阻塞在queue.take() 或者 queue.poll()方法上,此时调用线程的interrupt方法,线程会响应中断,则getTask方法会返回null,runWorker将结束循环。

彻底关闭线程池:tryTerminate方法

tryTerminate方法会尝试彻底关闭线程池,shutdown方法以及shutdownNow方法的最后一步会调用该方法。

final void tryTerminate() {
    //防止CAS失败,for循环重试
    for (;;) {
        //获取ctl的值
        int c = ctl.get();
        
        if (isRunning(c) || //线程池状态是运行状态不允许关闭线程池
            runStateAtLeast(c, TIDYING) || //或者线程池状态已经是TIDYING状态或者之后的状态,也不允许关闭线程池
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //或者线程池状态是SHUTDOWN,但是任务队列非空,也不允许关闭线程池
            return;
        
        //如果工作线程数量不为0
        if (workerCountOf(c) != 0) {
            //对第一个空闲任务执行中断操作,触发中断传播
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //CAS方式设置线程池状态为TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    //调用terminated钩子方法
                    terminated();
                } finally {
                    //设置线程池状态为TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    //通知awaitTermination方法线程池已经关闭
                    termination.signalAll();
                }
                //成功了,就返回并中断循环
                return;
            }
        } finally {
            mainLock.unlock();
        }
        //其它情况说明CAS失败了,下次for循环重试
    }
}

在条件合适的情况下,tryTerminate方法会将线程池状态从SHUTDOWN状态更改为TIDYING状态,调用完terminated钩子方法之后,将线程池状态更改为TERMINATED状态。

上述代码中最大的疑问就是下面这段代码:

//如果工作线程数量不为0
if (workerCountOf(c) != 0) {
    //对第一个空闲工作线程执行中断操作
    interruptIdleWorkers(ONLY_ONE);
    return;
}

在上一节分析interruptIdleWorkers()方法的时候,有个重载方法interruptIdleWorkers(boolean onlyOne),当onlyOne设置为true的时候,只会中断第一个空闲的工作线程,之后的不再中断了,调用点正是这里。

tryTerminate方法的目的是彻底关闭线程池,仅仅是将线程池的状态更改为TERMINATED状态是不行的,还得将所有线程都杀死才行,上面这段只对第一个空闲工作线程执行中断的操作正是杀死所有线程的保险。

2、中断传播机制

关闭线程池的时候,为了线程池在变成TIDYING状态前所有的线程都能关闭,shutdown方法中,先在interruptIdleWorkers方法调用中将所有空闲线程都调用了interrupt方法,这些线程本来由于任务队列为空都阻塞到了queue.take() 或者 queue.poll()方法上,此时调用线程的interrupt方法,线程必定会响应中断,则getTask方法会返回null,runWorker方法将结束循环,线程将退出。但是非空闲线程则不满足条件,没有调用interrupt方法设置中断标记。那这些线程什么时候退出呢?

非空闲线程执行完任务之后会阻塞在queue.take() 或者 queue.poll()方法上,由于线程没有中断设置中断标记,所以这些线程都将无法响应中断。shutdown方法都执行完了,这些的方法反而阻塞住了,这该怎么办?

办法就在于再触发随机一个工作线线程的中断,所以才有了tryTerminate的这段代码:

//如果工作线程数量不为0
if (workerCountOf(c) != 0) {
    //对第一个空闲工作线程执行中断操作
    interruptIdleWorkers(ONLY_ONE);
    return;
}

空闲线程被阻塞必然会响应中断,线程被回收,runWorker线程退出前会执行processWorkExit方法,关键就在于worker线程退出时执行的processWorkExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    //尝试停止线程池
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

发现了吧,从tryTerminate方法中断阻塞的空闲工作线程,空闲的工作线程退出前又调用了tryTerminate方法,接着又中断剩下的随机一个空闲的工作线程。。。。循环往复,形成了一个中断传播链,最终结束了所有的空闲线程,tryTerminate方法就可以彻底关闭线程了。

说到底,还是因为shutdown方法没有彻底的关闭所有线程,tryTerminate为此做了兜底措施。

3、暴力关闭线程池:shutdownNow方法

shutdownNow方法相对于shutdown方法来说,从处理上来看,就暴力且简单的多了:直接修改线程池状态到STOP状态,暴力中断所有正在执行任务的工作线程,清空所有等待队列中的任务,最后尝试关闭线程池。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //shutdown方法是修改状态到shutdown状态,而shutdownNow方法则直接修改到STOP状态
        advanceRunState(STOP);
        //shutdown方法是尝试中断空闲工作线程,而shutdownNow方法则不管是否空闲,全部中断
        interruptWorkers();
        //清空等待队列中的任务,并且转移到新的集合
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    //尝试关闭线程池
    tryTerminate();
    //返回未完成的任务列表
    return tasks;
}

对于shutdownNow来说,由于清空了任务列表,所以工作线程执行完手里的任务之后很快就会阻塞,tryTerminate方法成功的概率将会变得更大。

五、其它细节

1、线程池的懒加载和预加载

线程池中的线程并非是线程池初始化之后就默认创建好的,默认情况下,什么时候用到了,什么时候创建线程。线程池中线程有两种加载方式:懒加载和预加载。

懒加载:线程池的默认行为,线程池提交任务之后什么时候需要创建线程再创建。

预加载:即使线程池还没有提交过一个任务,也要先提前创建好线程预备使用。

懒加载原理

先回顾下execute方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //如果工作线程数量小于核心线程数量
    if (workerCountOf(c) < corePoolSize) {
        //创建新的工作线程,command为firstTask
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

其实我们分析的一直都是懒加载的过程,工作线程数量小于核心线程数量,就创建新的工作线程。

预加载原理

关于预加载,一直没有提及,ThreadPoolExecutor有个prestartAllCoreThreads方法如下:

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
    return n;
}

该方法会在没有提交任何任务的情况下初始化核心工作线程,firstTask参数为null。在runWorker方法中,firstTask为空的情况下,工作线程会直接从workQueue队列中取任务,由于等待队列是空的,工作线程会立即进入阻塞状态。




END.


#java #多线程编程
复制 复制成功