谈谈你对 AQS 的理解

1. * AQS 原理・语雀 (yuque.com)

从 ReentrantLock 的实现看 AQS 的原理及应用 - 美团技术团队 (meituan.com)

前言

AQS:AbstractQueuedSynchronizer 抽象队列同步器

AQS 是一个抽象类,是我们用到的锁的基础,例如我们经常用到的

  • ReentrantLock
  • Semaphore
  • CountdownLatch
  • ReentrantReadWriteLock
  • …..

上述的提到的这些,其实内部都是基于 AQS 来实现的。

我们举下面的一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Main {
public static int m = 0;

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
m++;
}
});
}
for (Thread t: threads) {
t.start();
}
// 等待所有线程结束
for (Thread t: threads) {
t.join();
}
System.out.println(m);
}
}

运行结果,我们发现并不一样,这就是多线程在获取临界资源出现异常的情况

1
2
3
4
5
BASH
# 第一次
9985
# 第二次
10000

那么为什么会出现这样的问题呢?

image-20220824221442022

解决方法 1

针对上述的问题,就是因为线程来操作临界资源的时候,因为线程并发修改而造成数据不一致的问题,其实最简单的方法,就是通过引入 Synchronized 来给我们的对象上锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

public class Main2 {
public static int m = 0;

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
synchronized (Main2.class) {
for (int j = 0; j < 100; j++) {
m++;
}
}
});
}
for (Thread t: threads) {
t.start();
}
// 等待所有线程结束
for (Thread t: threads) {
t.join();
}
System.out.println(m);
}
}

最后不管怎么执行,得到的结果都是 10000,为什么引入 Synchronized 就能让其保证数据一致呢?

通过 Synchronized 我们可以看到,它其实是对我们整个对象上锁,因为 m 是属于 static 修饰的静态变量,在 class 初始化的时候就已经创建,当第一个线程过来的时候,首先判断 Main2.class 是否已经加锁了,如果没有加锁,那么就进入,同时给对象加锁,关于 Synchronized 加锁 我们通过转换成字节码可以看到,其实就是添加两个关键字

1
2
3
4
PLAINTEXT
MonitorEnter
..... 被加锁的代码
MonitorExit

当第一个线程加锁后,其它线程在获取 Main2.class 对象的时候,就会进行阻塞,但是这里我们需要明白的是,因为 Synchronized 属于非公平锁,因此每个线程来的时候,都会尝试先获取锁,假设线程 1 还没有被执行,出现就绪状态,那么又可能会被其他线程剥夺 CPU 执行权,从而进入阻塞队列中。

但是如果线程 1 已经在执行的时候,它是不能被剥夺 CPU 执行权的,其它线程必须等待线程 1 执行完成后,释放锁才能够进入。

补充

关于 Synchronized 加锁,其实不是一来就是添加的重量级锁,而是有一个锁升级的过程

  • 首先第一个线程将会尝试添加一个偏向锁,也就是对当前获取的线程 1 有偏向的功能,即不进行复杂加锁校验等,在线程的头部信息中,是有这么一个记录用于记录偏向的线程的
  • 但是假设有多个线程来访问这个资源的时候,偏向锁就会升级成 CAS 轻量级锁,也就是我们所说的自旋锁,会不断的自旋操作来获取 CPU 资源
  • 但是假设某个线程长期进行自旋操作,而没有获取到锁,一般原来的版本是可以指定自旋次数的,后面的 JDK 进行优化,引入了适应性自旋。当某个线程长期获取不到资源的时候,就会升级成重量级锁,这个时候只要其它线程过来后,获取不到资源就会直接阻塞。

解决方法 2

除了刚刚说的引入 Synchronized 以外,还可以使用的就是引入 ReentrantLock 来实现,这里用到了可重入锁,也就是已获得锁的线程可以直接进入,其它的线程则需要等待该线程释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class Main3 {
public static int m = 0;
// ReentrantLock默认是非公平锁
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
ReentrantLock lock = new ReentrantLock();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
lock.lock();
try {
for (int j = 0; j < 100; j++) {
m++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
}

for (Thread t: threads) {
t.start();
}
// 等待所有线程结束
for (Thread t: threads) {
t.join();
}
System.out.println(m);
}
}

疑问

为什么已经有了 Synchronized 加锁了,在后面又引入了很多新的锁呢,如 ReentrantLock 等

  • Synchronized 加锁是需要 JVM 调用底层的 OS 来进行加锁的,这样就存在一些开销
    • 程序需要从 用户态 -> 内核态 进行切换,这一部分是比较消耗时间的
  • 因为 ReentrantLock 属于 API 层面,不需要从进行资源的切换,也就是不用从 用户态 切换到 内核态

解决方法 3

另外一种方法,就是自己实现一把锁,也就是实现 Lock 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

/**
* 自定义锁
*
* @author: 陌溪
* @create: 2020-07-17-17:06
*/
public class MyLock implements Lock {
private volatile int i = 0;
@Override
public void lock() {
synchronized (this) {
// 判断是否有线程已经占用了锁
while(i != 0) {
try {
// 如果有线程占有锁,可以直接阻塞
this.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
i = 1;
}
}

@Override
public void lockInterruptibly() throws InterruptedException {

}

@Override
public boolean tryLock() {
return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
synchronized (this) {
i = 0;
// 唤醒所有线程
this.notifyAll();
}
}

@Override
public Condition newCondition() {
return null;
}
}

上述只是简单的实现了一下,还是用了 Synchronized,但是例如 ReentrantLock 这样的代码,Main4.java 的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class Main4 {
public static int m = 0;
// ReentrantLock默认是非公平锁
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[100];
Lock lock = new MyLock();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
lock.lock();
try {
for (int j = 0; j < 100; j++) {
m++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
}

for (Thread t: threads) {
t.start();
}
// 等待所有线程结束
for (Thread t: threads) {
t.join();
}
System.out.println(m);
}
}

AQS

核心思想

AQS:AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,许多同步类实现都依赖于该同步器

AQS 用状态属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁

  • 独占模式是只有一个线程能够访问资源,如 ReentrantLock
  • 共享模式允许多个线程访问资源,如 Semaphore,ReentrantReadWriteLock 是组合式

AQS 核心思想:

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置锁定状态

  • 请求的共享资源被占用,AQS 用队列实现线程阻塞等待以及被唤醒时锁分配的机制,将暂时获取不到锁的线程加入到队列中

    CLH 是一种基于单向链表的高性能、公平的自旋锁,AQS 是将每条请求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配

    img

如上图所示,AQS 维护了一个 volatile int state 的变量 和 一个 FIFO 线程等待队列,多线程争用资源被阻塞的时候,就会进入这个队列中。state 就是共享资源,其访问方式有如下三种:

  • getState()
  • setState()
  • compareAndSetState()

AQS 定义了两种资源共享方式

  • Exclusive:独占,只有一个线程能执行,如 ReentrantLock
  • Share:共享,多个线程可以同时执行,如 Semaphore、CountDownLatch、ReadWriteLock、CycleBarrier

不同的自定义同步器争用共享资源的方式也不同


设计原理

设计原理:

  • 获取锁:

    1
    2
    3
    4
    5
    6
    7

    while(state 状态不允许获取) { // tryAcquire(arg)
    if(队列中还没有此线程) {
    入队并阻塞 park
    }
    }
    当前线程出队
  • 释放锁:

    1
    2
    3
    4

    if(state 状态允许了) { // tryRelease(arg)
    恢复阻塞的线程(s) unpark
    }

AbstractQueuedSynchronizer 中 state 设计:

  • state 使用了 32bit int 来维护同步状态,独占模式 0 表示未加锁状态,大于 0 表示已经加锁状态

    1
    2

    private volatile int state;
  • state 使用 volatile 修饰配合 cas 保证其修改时的原子性

  • state 表示线程重入的次数(独占模式)或者剩余许可数(共享模式)

  • state API:

    • protected final int getState():获取 state 状态
    • protected final void setState(int newState):设置 state 状态
    • protected final boolean compareAndSetState(int expect,int update)CAS 安全设置 state

封装线程的 Node 节点中 waitstate 设计:

  • 使用 volatile 修饰配合 CAS 保证其修改时的原子性

  • 表示 Node 节点的状态,有以下几种状态:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11

    // 默认为 0
    volatile int waitStatus;
    // 由于超时或中断,此节点被取消,不会再改变状态
    static final int CANCELLED = 1;
    // 此节点后面的节点已(或即将)被阻止(通过park),【当前节点在释放或取消时必须唤醒后面的节点】
    static final int SIGNAL = -1;
    // 此节点当前在条件队列中
    static final int CONDITION = -2;
    // 将releaseShared传播到其他节点
    static final int PROPAGATE = -3;

阻塞恢复设计:

  • 使用 park & unpark 来实现线程的暂停和恢复,因为命令的先后顺序不影响结果
  • park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细
  • park 线程可以通过 interrupt 打断

队列设计:

  • 使用了 FIFO 先入先出队列,并不支持优先级队列,同步队列是双向链表,便于出队入队

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

    // 头结点,指向哑元节点
    private transient volatile Node head;
    // 阻塞队列的尾节点,阻塞队列不包含头结点,从 head.next → tail 认为是阻塞队列
    private transient volatile Node tail;

    static final class Node {
    // 枚举:共享模式
    static final Node SHARED = new Node();
    // 枚举:独占模式
    static final Node EXCLUSIVE = null;
    // node 需要构建成 FIFO 队列,prev 指向前继节点
    volatile Node prev;
    // next 指向后继节点
    volatile Node next;
    // 当前 node 封装的线程
    volatile Thread thread;
    // 条件队列是单向链表,只有后继指针,条件队列使用该属性
    Node nextWaiter;
    }

    img

  • 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet,条件队列是单向链表

    1
    2
    3
    4
    5
    6
    7

    public class ConditionObject implements Condition, java.io.Serializable {
    // 指向条件队列的第一个 node 节点
    private transient Node firstWaiter;
    // 指向条件队列的最后一个 node 节点
    private transient Node lastWaiter;
    }

模板对象

同步器的设计是基于模板方法模式,该模式是基于继承的,主要是为了在不改变模板结构的前提下在子类中重新定义模板中的内容以实现复用代码

  • 使用者继承 AbstractQueuedSynchronizer 并重写指定的方法
  • 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,这些模板方法会调用使用者重写的方法

AQS 使用了模板方法模式,自定义同步器时需要重写下面几个 AQS 提供的模板方法:

1
2
3
4
5
6

isHeldExclusively() //该线程是否正在独占资源。只有用到condition才需要去实现它
tryAcquire(int) //独占方式。尝试获取资源,成功则返回true,失败则返回false
tryRelease(int) //独占方式。尝试释放资源,成功则返回true,失败则返回false
tryAcquireShared(int) //共享方式。尝试获取资源。负数表示失败;0表示成功但没有剩余可用资源;正数表示成功且有剩余资源
tryReleaseShared(int) //共享方式。尝试释放资源,成功则返回true,失败则返回false
  • 默认情况下,每个方法都抛出 UnsupportedOperationException
  • 这些方法的实现必须是内部线程安全的
  • AQS 类中的其他方法都是 final ,所以无法被其他类使用,只有这几个方法可以被其他类使用

自定义

自定义一个不可重入锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

class MyLock implements Lock {
//独占锁 不可重入
class MySync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 加上锁 设置 owner 为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override //解锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);//volatile 修饰的变量放在后面,防止指令重排
return true;
}
@Override //是否持有独占锁
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition() {
return new ConditionObject();
}
}

private MySync sync = new MySync();

@Override //加锁(不成功进入等待队列等待)
public void lock() {
sync.acquire(1);
}

@Override //加锁 可打断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override //尝试加锁,尝试一次
public boolean tryLock() {
return sync.tryAcquire(1);
}

@Override //尝试加锁,带超时
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

@Override //解锁
public void unlock() {
sync.release(1);
}

@Override //条件变量
public Condition newCondition() {
return sync.newCondition();
}
}

AQS 底层实现

AQS 使用了基于模板方法的设计模式,如果需要自定义同步器,一般的方式如下

  • 继承 AbstractQueuedSynchronizer,并重写指定的方法,在这里重写的方法就是对共享资源 state 获取和释放
  • 将 AQS 组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

我们通过下面的代码,来进行查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

/**
* AQS
*/
public class Sync extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
// 使用自旋锁 ,同时CAS必须保证原子性
// 目前的CPU底层汇编都有这条指令了,即支持原语操作
if (compareAndSetState(0, 1)) {
// 设置排它的拥有者,也就是互斥锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
assert arg == 1;
if(!isHeldExclusively()) {
throw new IllegalMonitorStateException();
}
// 释放锁
setExclusiveOwnerThread(null);
setState(0);
return super.tryRelease(arg);
}

@Override
protected boolean isHeldExclusively() {
// 判断当前线程 是不是和排它锁的线程一样
return getExclusiveOwnerThread() == Thread.currentThread();
}
}

自定义同步器在实现的时候,只需要实现共享资源 state 的获取和释放即可,至于具体线程等待队列的维护,AQS 已经在顶层实现好了。自定义同步器实现的时候,主要实现下面几种方法:

  • isHeldExclusively ():该线程是否正在独占资源。只有用到 condition 才需要实现它
  • tryAcquire (int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
  • tryRelease (int):独占方式,尝试释放资源,成功则返回 true,失败则返回 false
  • tryAcquireShared (int):共享方式,尝试获取资源。负数表示失败,0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared (int):共享方式。尝试释放资源,如果允许释放后允许唤醒后续等待节点返回 true,否则返回 false。

ReentrantLock

以 ReentrantLock(可重入独占式锁)为例,state 初始化为 0,表示未锁定状态,A 线程 lock () 时,会调用 tryAcquire () 独占锁,并将 state + 1,之后其它线程在想通过 tryAcquire 的时候就会失败,知道 A 线程 unlock () 到 state = 0 为止,其它线程才有机会获取到该锁。A 释放锁之前,自己也是可以重复获取此锁(state 累加),这就是可重入的概念。

注意:获取多少次锁就需要释放多少次锁,保证 state 是能够回到 0

CountDownLatch

以 CountDownLatch 为例,任务分 N 个子线程执行,state 就初始化为 N,N 个线程并行执行,每个线程执行完之后 countDown () 一次,state 就会 CAS 减 1,当 N 子线程全部执行完毕,state = 0,hui unpark () 主调动线程,主调用线程就会从 await () 函数返回,继续之后的动作。

总结

一般来说,自定义同步器要么独占方式,要么共享方式,他们也需要实现 tryAcquire 和 tryRelease、 tryAcquireShared 和 tryReleaseShared 中的一种即可。但 AQS 也支持自定义同步器实现独占和共享两种方式,比如 ReentrantLockReadWriteLock。

  • acquire () 和 acquireShared () 两种方式下,线程在等待队列中都是忽略中断的
  • acquireInterruptibly () 和 acquireSharedInterruptibly () 是支持响应中断的