基本概述

线程池:一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作

线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

线程池的作用:

1、降低资源的消耗,减少了创建和销毁任务的次数,每个工作线程都可以被重复利用,可执行多个任务

2、提高响应的速度,当任务到达时,如果有线程可以直接使用,不会出现系统僵死。

3、提高线程的管理,如果无限制的创建线程,不仅会消耗系统的资源,还会降低系统的稳定 性,使用线程池可以统一的分配调控。

线程池解决的问题是什么

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请 / 销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了 “池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。

Pooling is the grouping together of resources (assets, equipment, personnel, effort, etc.) for the purposes of maximizing advantage or minimizing risk to the users. The term is used in finance, computing and equipment management.——wikipedia

“池化” 思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。

在计算机领域中的表现为:统一管理 IT 资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  1. 内存池 (Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  2. 连接池 (Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池 (Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

架构说明

Java 中线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors(代表工具类),ExecutorService,ThreadPoolExecutor 这几个类。

image-20220822192701970

image-20220822192709359

操作 Pool

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING .

因为第一位是符号位,RUNNING 是负数,所以最小.

image.png

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

1
2
3
4
5
6
7
8

// c 为旧值, ctlOf 返回结果为新值

ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们

private static int ctlOf(int rs, int wc) { return rs | wc; }

创建方式

Executor

存放线程的容器:

1
2

private final HashSet<Worker> workers = new HashSet<Worker>();

构造方法:

1
2
3
4
5
6
7
8

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

线程池各个参数的关系

参数介绍:

  • corePoolSize:核心线程数,线程池中的常驻核心线程数

    • 在创建线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
    • 当线程池中的线程数目达到 corePoolSize 后,就会把到达的队列放到缓存队列中
  • maximumPoolSize:最大线程数,当队列中存放的任务达到队列容量时,当前可以同时运行的数量变为最大线程数,创建线程并立即执行最新的任务,与核心线程数之间的差值又叫救急线程数

  • keepAliveTime:多余的空闲线程存活时间

    • 当线程池数量超过 corePoolSize 时,当空闲时间达到 keepAliveTime 值时,多余的空闲线程会被销毁,直到只剩下 corePoolSize 个线程为止
    • 默认情况下,只有当线程池中的线程数大于 corePoolSize 时,keepAliveTime 才会起作用
  • unit:keepAliveTime 参数的时间单位

  • workQueue:任务队列,被提交的但未被执行的任务(类似于银行里面的候客区)

    • LinkedBlockingQueue:链表阻塞队列
    • SynchronousBlockingQueue:同步阻塞队列
  • threadFactory:线程工厂,创建新线程时用到,可以为线程创建时起名字

  • handler:拒绝策略,线程到达最大线程数仍有新任务时会执行拒绝策略

    以下所有拒绝策略都实现了 RejectedExecutionHandler 接口

    • AbortPolicy:默认,直接抛出 RejectedExcutionException 异常,阻止系统正常运行
    • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
    • CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
    • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

    补充:其他框架拒绝策略

    • Dubbo:在抛出 RejectedExecutionException 异常前记录日志,并 dump 线程栈信息,方便定位问题
    • Netty:创建一个新线程来执行任务
    • ActiveMQ:带超时等待(60s)尝试放入队列
    • PinPoint:它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

工作原理:

img

image-20220822193807378

  1. 创建线程池,这时没有创建线程(懒惰),等待提交过来的任务请求,调用 execute 方法才会创建线程
  2. 当调用 execute () 方法添加一个请求任务时,线程池会做如下判断:
    • 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务
    • 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列
    • 如果这时队列满了且正在运行的线程数量还小于 maximumPoolSize,那么会创建非核心线程立刻运行这个任务,对于阻塞队列中的任务不公平。这是因为创建每个 Worker(线程)对象会绑定一个初始任务,启动 Worker 时会优先执行
    • 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,会从队列中取下一个任务来执行
  4. 当一个线程空闲超过一定的时间(keepAliveTime)时,线程池会判断:如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉,所以线程池的所有任务完成后最终会收缩到 corePoolSize 大小

图片来源:https://space.bilibili.com/457326371/

以顾客去银行办理业务为例,谈谈线程池的底层工作原理

  1. 最开始假设来了两个顾客,因为 corePoolSize 为 2,因此这两个顾客直接能够去窗口办理
  2. 后面又来了三个顾客,因为 corePool 已经被顾客占用了,因此只有去候客区,也就是阻塞队列中等待
  3. 后面的人又陆陆续续来了,候客区可能不够用了,因此需要申请增加处理请求的窗口,这里的窗口指的是线程池中的线程数,以此来解决线程不够用的问题
  4. 假设受理窗口已经达到最大数,并且请求数还是不断递增,此时候客区和线程池都已经满了,为了防止大量请求冲垮线程池,已经需要开启拒绝策略
  5. 临时增加的线程会因为超过了最大存活时间,就会销毁,最后从最大数削减到核心数

Executors

Executors 提供了四种线程池的创建:newCachedThreadPool、newFixedThreadPool、newSingleThreadExecutor、newScheduledThreadPool

  • newFixedThreadPool:创建一个拥有 n 个线程的线程池

    1
    2
    3
    4
    5

    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
    • LinkedBlockingQueue 是一个单向链表实现的阻塞队列,默认大小为 Integer.MAX_VALUE,也就是无界队列,可以放任意数量的任务,在任务比较多的时候会导致 OOM(内存溢出)
    • 适用于任务量已知,相对耗时的长期任务
  • Executors.newCacheThreadPool ():创建一个可扩容的线程池

    1
    2
    3
    4
    5

    public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    new SynchronousQueue<Runnable>());
    }
    • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,全部都是救急线程(60s 后可以回收),可能会创建大量线程,从而导致 OOM
    • SynchronousQueue 作为阻塞队列,没有容量,对于每一个 take 的线程会阻塞直到有一个 put 的线程放入元素为止(类似一手交钱、一手交货)
    • 适合任务数比较密集,但每个任务执行时间较短的情况
  • Executors.newSingleThreadExecutor:创建一个只有 1 个线程的单线程池

    1
    2
    3
    4
    5
    6

    public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>()));
    }
    • 一个任务一个任务执行的场景
    • 保证所有任务按照指定顺序执行,线程数固定为 1,任务数多于 1 时会放入无界队列排队,任务执行完毕,这唯一的线程也不会被释放

对比:

  • 创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,线程池会新建一个线程,保证池的正常工作

  • Executors.newSingleThreadExecutor () 线程个数始终为 1,不能修改。FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

    原因:父类不能直接调用子类中的方法,需要反射或者创建对象的方式,可以调用子类静态方法

  • Executors.newFixedThreadPool (1) 初始时为 1,可以修改。对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改

img

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

public class MyThreadPoolDemo {
public static void main(String[] args) {

// Array Arrays(辅助工具类)
// Collection Collections(辅助工具类)
// Executor Executors(辅助工具类)


// 一池5个处理线程(用池化技术,一定要记得关闭)
ExecutorService threadPool = Executors.newFixedThreadPool(5);

// 模拟10个用户来办理业务,每个用户就是一个来自外部请求线程
try {

// 循环十次,模拟业务办理,让5个线程处理这10个请求
for (int i = 0; i < 10; i++) {
final int tempInt = i;
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 给用户:" + tempInt + " 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}

}
}

最后结果:

1
2
3
4
5
6
7
8
9
10
11

pool-1-thread-1 给用户:0 办理业务
pool-1-thread-5 给用户:4 办理业务
pool-1-thread-1 给用户:5 办理业务
pool-1-thread-4 给用户:3 办理业务
pool-1-thread-2 给用户:1 办理业务
pool-1-thread-3 给用户:2 办理业务
pool-1-thread-2 给用户:9 办理业务
pool-1-thread-4 给用户:8 办理业务
pool-1-thread-1 给用户:7 办理业务
pool-1-thread-5 给用户:6 办理业务

我们能够看到,一共有 5 个线程,在给 10 个用户办理业务


线程池的拒绝策略有哪些?

1、ThreadPoolExecutor.AbortPolicy:丢弃任务抛出 RejectedExecutionException 异常
打断当前执行流程 (默认)

使用场景:ExecutorService 接口的系列 ThreadPoolExecutor 因为都没有显示的设置
拒绝策略,所以默认的都是这个。ExecutorService 中的线程池实例队列都是无界的,
也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这
个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。

2、ThreadPoolExecutor.CallerRunsPolicy:只要线程池没有关闭,就由提交任务的当
前线程处理。

使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因
为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用
者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然
就慢了。

3、ThreadPoolExecutor.DiscardPolicy:直接静悄悄的丢弃这个任务,不触发任何动作
使用场景:如果提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会
悄无声息的吞噬你的的任务。所以这个策略基本上不用了

4、ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,重新提交被拒
绝的任务

使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是
老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的
场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消
息又来了,这个时候未执行的消息的版 > 本比现在提交的消息版本要低就可以被丢弃
了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消
息的时候一定要做好消息的版本比较。

ps:apple: 以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略
仍无法满足实际需要,完全可以自己扩展 RejectedExecutionHandler 接口。

开发要求

阿里巴巴 Java 开发手册要求:

  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程

    • 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题
    • 如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者过度切换的问题
  • 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式更加明确线程池的运行规则,规避资源耗尽的风险

    Executors 返回的线程池对象弊端如下:

    • FixedThreadPool 和 SingleThreadPool:请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
    • CacheThreadPool 和 ScheduledThreadPool:允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,导致 OOM

创建多大容量的线程池合适?

  • 一般来说池中总线程数是核心池线程数量两倍,确保当核心池有线程停止时,核心池外有线程进入核心池

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿

  • 过大会导致更多的线程上下文切换,占用更多内存

    上下文切换:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态,任务从保存到再加载的过程就是一次上下文切换

核心线程数常用公式:

  • CPU 密集型任务 (N+1): 这种任务消耗的是 CPU 资源,可以将核心线程数设置为 N (CPU 核心数) + 1,比 CPU 核心数多出来的一个线程是为了防止线程发生缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 某个核心就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间

    CPU 密集型简单理解就是利用 CPU 计算能力的任务比如在内存中对大量数据进行分析

  • I/O 密集型任务: 这种系统 CPU 处于阻塞状态,用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用,因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N 或 CPU 核数 / (1 - 阻塞系数),阻塞系数在 0.8~0.9 之间

    IO 密集型就是涉及到网络读取,文件读取此类任务 ,特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上

样例:

1
Runnable`+`ThreadPoolExecutor

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

import java.util.Date;

/**
* 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
* @author shuang.kou
*/
public class MyRunnable implements Runnable {

private String command;

public MyRunnable(String s) {
this.command = s;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
processCommand();
System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
}

private void processCommand() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return this.command;
}
}

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
PLAINTEXT
ThreadPoolExecutorDemo.java
JAVA
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

private static final int CORE_POOL_SIZE = 5;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
private static final Long KEEP_ALIVE_TIME = 1L;
public static void main(String[] args) {

//使用阿里巴巴推荐的创建线程池的方式
//通过ThreadPoolExecutor构造函数自定义参数创建
ThreadPoolExecutor executor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy());

for (int i = 0; i < 10; i++) {
//创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
Runnable worker = new MyRunnable("" + i);
//执行Runnable
executor.execute(worker);
}
//终止线程池
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
}

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler: 饱和策略为 CallerRunsPolicy

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

pool-1-thread-3 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:37 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-5 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-4 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-3 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-2 Start. Time = Sun Apr 12 11:14:42 CST 2020
pool-1-thread-1 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-4 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-5 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-3 End. Time = Sun Apr 12 11:14:47 CST 2020
pool-1-thread-2 End. Time = Sun Apr 12 11:14:47 CST 2020

创建线程池方式二:spring 的 ThreadPoolTaskExecutor 方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

@Configuration
public class ExecturConfig {
@Bean("taskExector")
public Executor taskExector() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);//
核心池大小
executor.setMaxPoolSize(5);//
最大线程数
executor.setQueueCapacity(3);//
队列长度
executor.setKeepAliveSeconds(10);//
线程空闲时间
executor.setThreadNamePrefix("tsak-asyn");//
线程前缀名称
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy
());//
配置拒绝策略
return executor;
}
}

提交方法

ExecutorService 类 API:

方法 说明
void execute(Runnable command) 执行任务(Executor 类 API)
Future<?> submit(Runnable task) 提交任务 task ()
Future submit(Callable task) 提交任务 task,用返回值 Future 获得任务执行结果
List invokeAll(Collection<? extends Callable> tasks) 提交 tasks 中所有任务
List invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) 提交 tasks 中所有任务,超时时间针对所有 task,超时会取消没有执行完的任务,并抛出超时异常
T invokeAny(Collection<? extends Callable> tasks) 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

execute 和 submit 都属于线程池的方法,对比:

  • execute 只能执行 Runnable 类型的任务,没有返回值; submit 既能提交 Runnable 类型任务也能提交 Callable 类型任务,底层是封装成 FutureTask,然后调用 execute 执行
  • execute 会直接抛出任务执行时的异常,submit 会吞掉异常,可通过 Future 的 get 方法将任务执行时的异常重新抛出

关闭方法

ExecutorService 类 API:

方法 说明
void shutdown() 线程池状态变为 SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定 ren’wu)
List shutdownNow() 线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回
boolean isShutdown() 不在 RUNNING 状态的线程池,此执行者已被关闭,方法返回 true
boolean isTerminated() 线程池状态是否是 TERMINATED,如果所有任务在关闭后完成,返回 true
boolean awaitTermination(long timeout, TimeUnit unit) 调用 shutdown 后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待

处理异常

execute 会直接抛出任务执行时的异常,submit 会吞掉异常,有两种处理方法

方法 1:主动捉异常

1
2
3
4
5
6
7
8
9
10

ExecutorService executorService = Executors.newFixedThreadPool(1);
pool.submit(() -> {
try {
System.out.println("task1");
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
}
});

方法 2:使用 Future 对象

1
2
3
4
5
6
7
8

ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<?> future = pool.submit(() -> {
System.out.println("task1");
int i = 1 / 0;
return true;
});
System.out.println(future.get());

线程池状态有哪些?

img

图3 线程池生命周期

  • 1.RUNNING 状态说明:线程池处在 RUNNING 状态时,能够接收新任务,以及对已
    添加的任务进行处理。 状态切换:线程池的初始化状态是 RUNNING。换句话说,
    线程池被一旦被创建,就处于 RUNNING 状态,并且线程池中的任务数为 0

  • 2.SHUTDOWN 状态说明:线程池处在 SHUTDOWN 状态时,不接收新任务,但能处
    理已添加的任务。 状态切换:调用线程池的 shutdown () 接口时,线程池由 RUNNING
    -> SHUTDOWN。

  • 3.STOP 状态说明:线程池处在 STOP 状态时,不接收新任务,不处理已添加的任务,
    并且会中断正在处理的任务。 状态切换:调用线程池的 shutdownNow () 接口时,线
    程池由 (RUNNING or SHUTDOWN) -> STOP。

  • 4.TIDYING 状态说明:当所有的任务已终止,ctl 记录的” 任务数量” 为 0,线程池会
    变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated ()。
    terminated () 在 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,
    进行相应的处理;可以通过重载 terminated () 函数来实现。

    状态切换:当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在 STOP 状态下,线程池中执行的任务为空时,
    就会由 STOP -> TIDYING。

  • 5.TERMINATED 状态说明:线程池彻底终止,就变成 TERMINATED 状态。 状态切换:
    线程池处在 TIDYING 状态时,执行完 terminated () 之后,就会由 TIDYING ->
    TERMINATED。

    shutdown 和 shutdownNow 的区别?

    ・shutdown() : 关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受
    新任务了,但是队列里的任务得执行完毕。
    ・shutdownNow() : 关闭线程池,线程的状态变为 STOP。线程池会终止当前正
    在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

Tomcat (的) 线程池 (策略)

Tomcat 在哪里用到了线程池呢

img

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

扩展了 ThreadPoolExecutor

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize
    • 这时不会立刻抛 RejectedExecutionException 异常
    • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}

TaskQueue.java

1
2
3
4
5
6
7
8
9

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() )
throw new RejectedExecutionException(
"Executor not running, can't force a command into the queue"
);
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
is rejected
}

Connector 配置

image.png

image.png

image.png


任务调度

Timer

Timer 实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

private static void method1() {
Timer timer = new Timer();
TimerTask task1 = new TimerTask() {
@Override
public void run() {
System.out.println("task 1");
//int i = 1 / 0;//任务一的出错会导致任务二无法执行
Thread.sleep(2000);
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run() {
System.out.println("task 2");
}
};
// 使用 timer 添加两个任务,希望它们都在 1s 后执行
// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此任务1的延时,影响了任务2的执行
timer.schedule(task1, 1000);//17:45:56 c.ThreadPool [Timer-0] - task 1
timer.schedule(task2, 1000);//17:45:58 c.ThreadPool [Timer-0] - task 2
}

Scheduled

任务调度线程池 ScheduledThreadPoolExecutor 继承 ThreadPoolExecutor:

  • 使用内部类 ScheduledFutureTask 封装任务
  • 使用内部类 DelayedWorkQueue 作为线程池队列
  • 重写 onShutdown 方法去处理 shutdown 后的任务
  • 提供 decorateTask 方法作为 ScheduledFutureTask 的修饰方法,以便开发者进行扩展

构造方法:Executors.newScheduledThreadPool(int corePoolSize)

1
2
3
4
5
6
7

public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数固定为 Integer.MAX_VALUE,保活时间 keepAliveTime 固定为 0
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 阻塞队列是 DelayedWorkQueue
new DelayedWorkQueue());
}

常用 API:

  • ScheduledFuture<?> schedule(Runnable/Callable<V>, long delay, TimeUnit u):延迟执行任务
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable/Callable<V>, long initialDelay, long period, TimeUnit unit):定时执行周期任务,不考虑执行的耗时,参数为初始延迟时间、间隔时间、单位
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable/Callable<V>, long initialDelay, long delay, TimeUnit unit):定时执行周期任务,考虑执行的耗时,参数为初始延迟时间、间隔时间、单位

基本使用:

  • 延迟任务,但是出现异常并不会在控制台打印,也不会影响其他线程的执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

package hm.Executor;


import java.util.Date;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduledExecutorService {
public static void main(String[] args){
// 线程池大小为1时也是串行执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 添加两个任务,都在 1s 后同时执行
executor.schedule(() -> {
System.out.println("任务1,执行时间:" + new Date());
//int i = 1 / 0;
try { Thread.sleep(2000); } catch (InterruptedException e) { }
}, 4000, TimeUnit.MILLISECONDS);

executor.schedule(() -> {
System.out.println("任务2,执行时间:" + new Date());
}, 1000, TimeUnit.MILLISECONDS);
}
}
  • 定时任务 scheduleAtFixedRate:一次任务的启动到下一次任务的启动之间只要大于等于间隔时间,抢占到 CPU 就会立即执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

package hm.Executor;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduleAtFixedRate {

public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
System.out.println("start..." + new Date());

pool.scheduleAtFixedRate(() -> {
System.out.println("running..." + new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 10, 1, TimeUnit.SECONDS);

}
}

第一次延时执行的时间是 10s 钟,之后每隔一秒执行一次,

1
2
3
4
5
6

start...Mon Aug 22 17:54:47 CST 2022
running...Mon Aug 22 17:54:57 CST 2022
running...Mon Aug 22 17:54:59 CST 2022
running...Mon Aug 22 17:55:01 CST 2022
running...Mon Aug 22 17:55:03 CST 2022

此方法会由于上一次时间间隔而推迟本次的时间的间隔

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

start...Mon Aug 22 18:00:39 CST 2022
running...Mon Aug 22 18:00:40 CST 2022
running...Mon Aug 22 18:00:44 CST 2022
running...Mon Aug 22 18:00:48 CST 2022
running...Mon Aug 22 18:00:52 CST 2022
running...Mon Aug 22 18:00:56 CST 2022
running...Mon Aug 22 18:01:00 CST 2022
running...Mon Aug 22 18:01:04 CST 2022
running...Mon Aug 22 18:01:08 CST 2022
running...Mon Aug 22 18:01:12 CST 2022
running...Mon Aug 22 18:01:16 CST 2022
JAVA
ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
System.out.println("start..." + new Date());

pool.scheduleAtFixedRate(() -> {
System.out.println("running..." + new Date());
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 1, 1, TimeUnit.SECONDS);

}
  • 定时任务 scheduleWithFixedDelay:一次任务的结束到下一次任务的启动之间等于间隔时间,抢占到 CPU 就会立即执行,这个方法才是真正的设置两个任务之间的间隔
1
2
3
4
5
6
7

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {
log.debug("running...");
sleep(2);
}, 1, 1, TimeUnit.SECONDS);

成员属性

成员变量
  • shutdown 后是否继续执行周期任务:

    1
    2

    private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  • shutdown 后是否继续执行延迟任务:

    1
    2

    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
  • 取消方法是否将该任务从队列中移除:

    1
    2
    3

    // 默认 false,不移除,等到线程拿到任务之后抛弃
    private volatile boolean removeOnCancel = false;
  • 任务的序列号,可以用来比较优先级:

    1
    2

    private static final AtomicLong sequencer = new AtomicLong();

延迟任务

ScheduledFutureTask 继承 FutureTask,实现 RunnableScheduledFuture 接口,具有延迟执行的特点,覆盖 FutureTask 的 run 方法来实现对延时执行、周期执行的支持。对于延时任务调用 FutureTask#run,而对于周期性任务则调用 FutureTask#runAndReset 并且在成功之后根据 fixed-delay/fixed-rate 模式来设置下次执行时间并重新将任务塞到工作队列

在调度线程池中无论是 runnable 还是 callable,无论是否需要延迟和定时,所有的任务都会被封装成 ScheduledFutureTask

成员变量:

  • 任务序列号:

    1
    2

    private final long sequenceNumber;
  • 执行时间:

    1
    2
    3

    private long time; // 任务可以被执行的时间,交付时间,以纳秒表示
    private final long period; // 0 表示非周期任务,正数表示 fixed-rate 模式的周期,负数表示 fixed-delay 模式

    fixed-rate:两次开始启动的间隔,fixed-delay:一次执行结束到下一次开始启动

  • 实际的任务对象:

    1
    2

    RunnableScheduledFuture<V> outerTask = this;
  • 任务在队列数组中的索引下标:

    1
    2
    3

    // DelayedWorkQueue 底层使用的数据结构是最小堆,记录当前任务在堆中的索引,-1 代表删除
    int heapIndex;

成员方法:

  • 构造方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    // 任务的触发时间
    this.time = ns;
    // 任务的周期,多长时间执行一次
    this.period = period;
    // 任务的序号
    this.sequenceNumber = sequencer.getAndIncrement();
    }
  • compareTo ():ScheduledFutureTask 根据执行时间 time 正序排列,如果执行时间相同,在按照序列号 sequenceNumber 正序排列,任务需要放入 DelayedWorkQueue,延迟队列中使用该方法按照从小到大进行排序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
    return 0;
    if (other instanceof ScheduledFutureTask) {
    // 类型强转
    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
    // 比较者 - 被比较者的执行时间
    long diff = time - x.time;
    // 比较者先执行
    if (diff < 0)
    return -1;
    // 被比较者先执行
    else if (diff > 0)
    return 1;
    // 比较者的序列号小
    else if (sequenceNumber < x.sequenceNumber)
    return -1;
    else
    return 1;
    }
    // 不是 ScheduledFutureTask 类型时,根据延迟时间排序
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }
  • run ():执行任务,非周期任务直接完成直接结束,周期任务执行完后会设置下一次的执行时间,重新放入线程池的阻塞队列,如果线程池中的线程数量少于核心线程,就会添加 Worker 开启新线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    public void run() {
    // 是否周期性,就是判断 period 是否为 0
    boolean periodic = isPeriodic();
    // 根据是否是周期任务检查当前状态能否执行任务,不能执行就取消任务
    if (!canRunInCurrentRunState(periodic))
    cancel(false);
    // 非周期任务,直接调用 FutureTask#run 执行
    else if (!periodic)
    ScheduledFutureTask.super.run();
    // 周期任务的执行,返回 true 表示执行成功
    else if (ScheduledFutureTask.super.runAndReset()) {
    // 设置周期任务的下一次执行时间
    setNextRunTime();
    // 任务的下一次执行安排,如果当前线程池状态可以执行周期任务,加入队列,并开启新线程
    reExecutePeriodic(outerTask);
    }
    }

    周期任务正常完成后任务的状态不会变化,依旧是 NEW,不会设置 outcome 属性。但是如果本次任务执行出现异常,会进入 setException 方法将任务状态置为异常,把异常保存在 outcome 中,方法返回 false,后续的该任务将不会再周期的执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    protected boolean runAndReset() {
    // 任务不是新建的状态了,或者被别的线程执行了,直接返回 false
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
    return false;
    boolean ran = false;
    int s = state;
    try {
    Callable<V> c = callable;
    if (c != null && s == NEW) {
    try {
    // 执行方法,没有返回值
    c.call();
    ran = true;
    } catch (Throwable ex) {
    // 出现异常,把任务设置为异常状态,唤醒所有的 get 阻塞线程
    setException(ex);
    }
    }
    } finally {
    // 执行完成把执行线程引用置为 null
    runner = null;
    s = state;
    // 如果线程被中断进行中断处理
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    // 如果正常执行,返回 true,并且任务状态没有被取消
    return ran && s == NEW;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    // 任务下一次的触发时间
    private void setNextRunTime() {
    long p = period;
    if (p > 0)
    // fixed-rate 模式,【时间设置为上一次执行任务的时间 + p】,两次任务执行的时间差
    time += p;
    else
    // fixed-delay 模式,下一次执行时间是【当前这次任务结束的时间(就是现在) + delay 值】
    time = triggerTime(-p);
    }
  • reExecutePeriodic():准备任务的下一次执行,重新放入阻塞任务队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    // ScheduledThreadPoolExecutor#reExecutePeriodic
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
    // 【放入任务队列】
    super.getQueue().add(task);
    // 如果提交完任务之后,线程池状态变为了 shutdown 状态,需要再次检查是否可以执行,
    // 如果不能执行且任务还在队列中未被取走,则取消任务
    if (!canRunInCurrentRunState(true) && remove(task))
    task.cancel(false);
    else
    // 当前线程池状态可以执行周期任务,加入队列,并【根据线程数量是否大于核心线程数确定是否开启新线程】
    ensurePrestart();
    }
    }
  • cancel ():取消任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    public boolean cancel(boolean mayInterruptIfRunning) {
    // 调用父类 FutureTask#cancel 来取消任务
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    // removeOnCancel 用于控制任务取消后是否应该从阻塞队列中移除
    if (cancelled && removeOnCancel && heapIndex >= 0)
    // 从等待队列中删除该任务,并调用 tryTerminate() 判断是否需要停止线程池
    remove(this);
    return cancelled;
    }

延迟队列

DelayedWorkQueue 是支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue(小根堆、满二叉树)存储元素

其他阻塞队列存储节点的数据结构大都是链表,延迟队列是数组,所以延迟队列出队头元素后需要让其他元素(尾)替换到头节点,防止空指针异常

成员变量:

  • 容量:

    1
    2
    3
    4
    5

    private static final int INITIAL_CAPACITY = 16; // 初始容量
    private int size = 0; // 节点数量
    private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; // 存放节点
  • 锁:

    1
    2
    3

    private final ReentrantLock lock = new ReentrantLock(); // 控制并发
    private final Condition available = lock.newCondition();// 条件队列
  • 阻塞等待头节点的线程:线程池内的某个线程去 take () 获取任务时,如果延迟队列顶层节点不为 null(队列内有任务),但是节点任务还不到触发时间,线程就去检查队列的 leader 字段是否被占用

    • 如果未被占用,则当前线程占用该字段,然后当前线程到 available 条件队列指定超时时间 堆顶任务.time - now() 挂起
    • 如果被占用,当前线程直接到 available 条件队列不指定超时时间的挂起
    1
    2
    3

    // leader 在 available 条件队列内是首元素,它超时之后会醒过来,然后再次将堆顶元素获取走,获取走之后,take()结束之前,会调用是 available.signal() 唤醒下一个条件队列内的等待者,然后释放 lock,下一个等待者被唤醒后去到 AQS 队列,做 acquireQueue(node) 逻辑
    private Thread leader = null;

成员方法

  • offer ():插入节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41

    public boolean offer(Runnable x) {
    // 判空
    if (x == null)
    throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    // 队列锁,增加删除数据时都要加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    int i = size;
    // 队列数量大于存放节点的数组长度,需要扩容
    if (i >= queue.length)
    // 扩容为原来长度的 1.5 倍
    grow();
    size = i + 1;
    // 当前是第一个要插入的节点
    if (i == 0) {
    queue[0] = e;
    // 修改 ScheduledFutureTask 的 heapIndex 属性,表示该对象在队列里的下标
    setIndex(e, 0);
    } else {
    // 向上调整元素的位置,并更新 heapIndex
    siftUp(i, e);
    }
    // 情况1:当前任务是第一个加入到 queue 内的任务,所以在当前任务加入到 queue 之前,take() 线程会直接
    // 到 available 队列不设置超时的挂起,并不会去占用 leader 字段,这时需会唤醒一个线程 让它去消费
    // 情况2:当前任务【优先级最高】,原堆顶任务可能还未到触发时间,leader 线程设置超时的在 available 挂起
    // 原先的 leader 等待的是原先的头节点,所以 leader 已经无效,需要将 leader 线程唤醒,
    // 唤醒之后它会检查堆顶,如果堆顶任务可以被消费,则直接获取走,否则继续成为 leader 等待新堆顶任务
    if (queue[0] == e) {
    // 将 leader 设置为 null
    leader = null;
    // 直接随便唤醒等待头结点的阻塞线程
    available.signal();
    }
    } finally {
    lock.unlock();
    }
    return true;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    // 插入新节点后对堆进行调整,进行节点上移,保持其特性【节点的值小于子节点的值】,小顶堆
    private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
    // 父节点,就是堆排序
    int parent = (k - 1) >>> 1;
    RunnableScheduledFuture<?> e = queue[parent];
    // key 和父节点比,如果大于父节点可以直接返回,否则就继续上浮
    if (key.compareTo(e) >= 0)
    break;
    queue[k] = e;
    setIndex(e, k);
    k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
    }
  • poll ():非阻塞获取头结点,获取执行时间最近并且可以执行的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    // 非阻塞获取
    public RunnableScheduledFuture<?> poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 获取队头节点,因为是小顶堆
    RunnableScheduledFuture<?> first = queue[0];
    // 头结点为空或者的延迟时间没到返回 null
    if (first == null || first.getDelay(NANOSECONDS) > 0)
    return null;
    else
    // 头结点达到延迟时间,【尾节点成为替代节点下移调整堆结构】,返回头结点
    return finishPoll(first);
    } finally {
    lock.unlock();
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
    // 获取尾索引
    int s = --size;
    // 获取尾节点
    RunnableScheduledFuture<?> x = queue[s];
    // 将堆结构最后一个节点占用的 slot 设置为 null,因为该节点要尝试升级成堆顶,会根据特性下调
    queue[s] = null;
    // s == 0 说明 当前堆结构只有堆顶一个节点,此时不需要做任何的事情
    if (s != 0)
    // 从索引处 0 开始向下调整
    siftDown(0, x);
    // 出队的元素索引设置为 -1
    setIndex(f, -1);
    return f;
    }
  • take ():阻塞获取头节点,读取当前堆中最小的也就是触发时间最近的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50

    public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 保证线程安全
    lock.lockInterruptibly();
    try {
    for (;;) {
    // 头节点
    RunnableScheduledFuture<?> first = queue[0];
    if (first == null)
    // 等待队列不空,直至有任务通过 offer 入队并唤醒
    available.await();
    else {
    // 获取头节点的延迟时间是否到时
    long delay = first.getDelay(NANOSECONDS);
    if (delay <= 0)
    // 到达触发时间,获取头节点并调整堆,重新选择延迟时间最小的节点放入头部
    return finishPoll(first);

    // 逻辑到这说明头节点的延迟时间还没到
    first = null;
    // 说明有 leader 线程在等待获取头节点,当前线程直接去阻塞等待
    if (leader != null)
    available.await();
    else {
    // 没有 leader 线程,【当前线程作为leader线程,并设置头结点的延迟时间作为阻塞时间】
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
    // 在条件队列 available 使用带超时的挂起(堆顶任务.time - now() 纳秒值..)
    available.awaitNanos(delay);
    // 到达阻塞时间时,当前线程会从这里醒来来
    } finally {
    // t堆顶更新,leader 置为 null,offer 方法释放锁后,
    // 有其它线程通过 take/poll 拿到锁,读到 leader == null,然后将自身更新为leader。
    if (leader == thisThread)
    // leader 置为 null 用以接下来判断是否需要唤醒后继线程
    leader = null;
    }
    }
    }
    }
    } finally {
    // 没有 leader 线程,头结点不为 null,唤醒阻塞获取头节点的线程,
    // 【如果没有这一步,就会出现有了需要执行的任务,但是没有线程去执行】
    if (leader == null && queue[0] != null)
    available.signal();
    lock.unlock();
    }
    }
  • remove ():删除节点,堆移除一个元素的时间复杂度是 O (log n),延迟任务维护了 heapIndex,直接访问的时间复杂度是 O (1),从而可以更快的移除元素,任务在队列中被取消后会进入该逻辑

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    public boolean remove(Object x) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    // 查找对象在队列数组中的下标
    int i = indexOf(x);
    // 节点不存在,返回 false
    if (i < 0)
    return false;
    // 修改元素的 heapIndex,-1 代表删除
    setIndex(queue[i], -1);
    // 尾索引是长度-1
    int s = --size;
    // 尾节点作为替代节点
    RunnableScheduledFuture<?> replacement = queue[s];
    queue[s] = null;
    // s == i 说明头节点就是尾节点,队列空了
    if (s != i) {
    // 向下调整
    siftDown(i, replacement);
    // 说明没发生调整
    if (queue[i] == replacement)
    // 上移和下移不可能同时发生,替代节点大于子节点时下移,否则上移
    siftUp(i, replacement);
    }
    return true;
    } finally {
    lock.unlock();
    }
    }

成员方法

提交任务
  • schedule ():延迟执行方法,并指定执行的时间,默认是当前时间

    1
    2
    3
    4
    5

    public void execute(Runnable command) {
    // 以零延时任务的形式实现
    schedule(command, 0, NANOSECONDS);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    // 判空
    if (command == null || unit == null) throw new NullPointerException();
    // 没有做任何操作,直接将 task 返回,该方法主要目的是用于子类扩展,并且【根据延迟时间设置任务触发的时间点】
    RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(
    command, null, triggerTime(delay, unit)));
    // 延迟执行
    delayedExecute(t);
    return t;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    // 返回【当前时间 + 延迟时间】,就是触发当前任务执行的时间
    private long triggerTime(long delay, TimeUnit unit) {
    // 设置触发的时间
    return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    long triggerTime(long delay) {
    // 如果 delay < Long.Max_VALUE/2,则下次执行时间为当前时间 +delay
    // 否则为了避免队列中出现由于溢出导致的排序紊乱,需要调用overflowFree来修正一下delay
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

    overflowFree 的原因:如果某个任务的 delay 为负数,说明当前可以执行(其实早该执行了)。阻塞队列中维护任务顺序是基于 compareTo 比较的,比较两个任务的顺序会用 time 相减。那么可能出现一个 delay 为正数减去另一个为负数的 delay,结果上溢为负数,则会导致 compareTo 产生错误的结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    private long overflowFree(long delay) {
    Delayed head = (Delayed) super.getQueue().peek();
    if (head != null) {
    long headDelay = head.getDelay(NANOSECONDS);
    // 判断一下队首的delay是不是负数,如果是正数就不用管,怎么减都不会溢出
    // 否则拿当前 delay 减去队首的 delay 来比较看,如果不出现上溢,排序不会乱
    // 不然就把当前 delay 值给调整为 Long.MAX_VALUE + 队首 delay
    if (headDelay < 0 && (delay - headDelay < 0))
    delay = Long.MAX_VALUE + headDelay;
    }
    return delay;
    }
  • scheduleAtFixedRate ():定时执行,一次任务的启动到下一次任务的启动的间隔

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (period <= 0)
    throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】
    ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command, null,
    triggerTime(initialDelay, unit), unit.toNanos(period));
    // 默认返回本身
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 开始执行这个任务
    delayedExecute(t);
    return t;
    }
  • scheduleWithFixedDelay ():定时执行,一次任务的结束到下一次任务的启动的间隔

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,
    TimeUnit unit) {
    if (command == null || unit == null)
    throw new NullPointerException();
    if (delay <= 0)
    throw new IllegalArgumentException();
    // 任务封装,【指定初始的延迟时间和周期时间】,周期时间为 - 表示是 fixed-delay 模式
    ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null,
    triggerTime(initialDelay, unit), unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
    }
运行任务
  • delayedExecute():校验线程池状态,延迟或周期性任务的主要执行方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16

    private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池是 SHUTDOWN 状态,需要执行拒绝策略
    if (isShutdown())
    reject(task);
    else {
    // 把当前任务放入阻塞队列,因为需要【获取执行时间最近的】,当前任务需要比较
    super.getQueue().add(task);
    // 线程池状态为 SHUTDOWN 并且不允许执行任务了,就从队列删除该任务,并设置任务的状态为取消状态
    if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task))
    task.cancel(false);
    else
    // 可以执行
    ensurePrestart();
    }
    }
  • ensurePrestart():开启线程执行任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    // ThreadPoolExecutor#ensurePrestart
    void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    // worker数目小于corePoolSize,则添加一个worker。
    if (wc < corePoolSize)
    // 第二个参数 true 表示采用核心线程数量限制,false 表示采用 maximumPoolSize
    addWorker(null, true);
    // corePoolSize = 0的情况,至少开启一个线程,【担保机制】
    else if (wc == 0)
    addWorker(null, false);
    }
  • canRunInCurrentRunState ():任务运行时都会被调用以校验当前状态是否可以运行任务

    1
    2
    3
    4
    5
    6

    boolean canRunInCurrentRunState(boolean periodic) {
    // 根据是否是周期任务判断,在线程池 shutdown 后是否继续执行该任务,默认非周期任务是继续执行的
    return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown :
    executeExistingDelayedTasksAfterShutdown);
    }
  • onShutdown ():删除并取消工作队列中的不需要再执行的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30

    void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    // shutdown 后是否仍然执行延时任务
    boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // shutdown 后是否仍然执行周期任务
    boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果两者皆不可,则对队列中【所有任务】调用 cancel 取消并清空队列
    if (!keepDelayed && !keepPeriodic) {
    for (Object e : q.toArray())
    if (e instanceof RunnableScheduledFuture<?>)
    ((RunnableScheduledFuture<?>) e).cancel(false);
    q.clear();
    }
    else {
    for (Object e : q.toArray()) {
    if (e instanceof RunnableScheduledFuture) {
    RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
    // 不需要执行的任务删除并取消,已经取消的任务也需要从队列中删除
    if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
    t.isCancelled()) {
    if (q.remove(t))
    t.cancel(false);
    }
    }
    }
    }
    // 因为任务被从队列中清理掉,所以需要调用 tryTerminate 尝试【改变线程池的状态】
    tryTerminate();
    }