阻塞队列
阻塞队列
基本介绍
有界队列和无界队列:
- 有界队列:有固定大小的队列,比如设定了固定大小的LinkedBlockingQueue,又或者大小为0
- 无界队列:没有设置固定的队列,这些队列可以直接入队,直到溢出(超过 Integer.MAX_VALUE),所以相当于无界
java.util.concurrent.BlockingQueue接口有以下阻塞队列的实现:FIFO队列
- ArrayBlockQueue:由数组结构组成的有界阻塞队列
- LinkedBlockingQueue:由链表结构组成的无界(默认大小 Integer.MAX_VALUE)的阻塞队列
- PriorityBlockQueue:支持优先级排序的无界阻塞队列
- DelayedWorkQueue:使用优先级队列实现的延迟无界阻塞队列
- SynchronousQueue:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个 put 的线程放入元素为止
- LinkedTransferQueue:由链表结构组成的无界阻塞队列
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列
与普通队列不同的是(LinkedList、ArrayList等)的不同点在于阻塞队列中阻塞添加和阻塞删除方法,以及线程安全:
阻塞添加put():当阻塞队列元素已满时,添加队列元素的线程会被阻塞,直到元素不满时才重新唤醒线程执行。
阻塞删除take():在阻塞队列元素已满时,添加队列的线程会被阻塞,直到队列的元素 不满时才重新唤醒线程执行
链表队列
入队出队
LinkedBlockingQueue源码:
1 | public class LinkedBlockingQueue<E> extends AbstractQueue<E> |
入队:尾插法
初始化链表
last = head = new Node<E>(null)
,Dummy 节点用来占位,item 为 null1
2
3
4
5
6public LinkedBlockingQueue(int capacity) {
// 默认是 Integer.MAX_VALUE
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}当一个节点入队:
1
2
3
4private void enqueue(Node<E> node) {
// 从右向左计算
last = last.next = node;
}再来一个节点入队
last = last.next = node
出队:出队头节点,FIFO
出队源码:
1
2
3
4
5
6
7
8
9
10
11
12
13private E dequeue() {
Node<E> h = head;
// 获取临头节点
Node<E> first = h.next;
// 自己指向自己,help GC
h.next = h;
head = first;
// 出队的元素
E x = first.item;
// 【当前节点置为 Dummy 节点】
first.item = null;
return x;
}h = head
→first = h.next
h.next = h
→head = first
first.item = null
:当前节点置为 Dummy 节点
同步队列
成员属性
SynchronousQueue 是一个不存储元素的 BlockingQueue,每一个生产者必须阻塞匹配到一个消费者
成员变量:
运行当前程序的平台拥有 CPU 的数量:
1
static final int NCPUS = Runtime.getRuntime().availableProcessors()
指定超时时间后,当前线程最大自旋次数:
1
2// 只有一个 CPU 时自旋次数为 0,所有程序都是串行执行,多核 CPU 时自旋 32 次是一个经验值
static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;自旋的原因:线程挂起唤醒需要进行上下文切换,涉及到用户态和内核态的转变,是非常消耗资源的。自旋期间线程会一直检查自己的状态是否被匹配到,如果自旋期间被匹配到,那么直接就返回了,如果自旋次数达到某个指标后,还是会将当前线程挂起
未指定超时时间,当前线程最大自旋次数:
1
static final int maxUntimedSpins = maxTimedSpins * 16; // maxTimedSpins 的 16 倍
指定超时限制的阈值,小于该值的线程不会被挂起:
1
static final long spinForTimeoutThreshold = 1000L; // 纳秒
超时时间设置的小于该值,就会被禁止挂起,阻塞再唤醒的成本太高,不如选择自旋空转
转换器:
1
2
3
4
5
6
7
8
9
10
11
12private transient volatile Transferer<E> transferer;
abstract static class Transferer<E> {
/**
* 参数一:可以为 null,null 时表示这个请求是一个 REQUEST 类型的请求,反之是一个 DATA 类型的请求
* 参数二:如果为 true 表示指定了超时时间,如果为 false 表示不支持超时,会一直阻塞到匹配或者被打断
* 参数三:超时时间限制,单位是纳秒
* 返回值:返回值如果不为 null 表示匹配成功,DATA 类型的请求返回当前线程 put 的数据
* 如果返回 null,表示请求超时或被中断
*/
abstract E transfer(E e, boolean timed, long nanos);
}构造方法:
1
2
3
4
5public SynchronousQueue(boolean fair) {
// fair 默认 false
// 非公平模式实现的数据结构是栈,公平模式的数据结构是队列
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}成员方法:
1
2
3
4
5
6
7public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
public E poll() {
return transferer.transfer(null, true, 0);
}
非公实现
TransferStack 是非公平的同步队列,因为所有的请求都被压入栈中,栈顶的元素会最先得到匹配,造成栈底的等待线程饥饿
TransferStack 类成员变量:
请求类型:
1
2
3
4
5
6
7
8// 表示 Node 类型为请求类型
static final int REQUEST = 0;
// 表示 Node类 型为数据类型
static final int DATA = 1;
// 表示 Node 类型为匹配中类型
// 假设栈顶元素为 REQUEST-NODE,当前请求类型为 DATA,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】
// 假设栈顶元素为 DATA-NODE,当前请求类型为 REQUEST,入栈会修改类型为 FULFILLING 【栈顶 & 栈顶之下的一个node】
static final int FULFILLING = 2;栈顶元素:
1
volatile SNode head;
内部类 SNode:
成员变量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14static final class SNode {
// 指向下一个栈帧
volatile SNode next;
// 与当前 node 匹配的节点
volatile SNode match;
// 假设当前node对应的线程自旋期间未被匹配成功,那么node对应的线程需要挂起,
// 挂起前 waiter 保存对应的线程引用,方便匹配成功后,被唤醒。
volatile Thread waiter;
// 数据域,不为空表示当前 Node 对应的请求类型为 DATA 类型,反之则表示 Node 为 REQUEST 类型
Object item;
// 表示当前Node的模式 【DATA/REQUEST/FULFILLING】
int mode;
}构造方法:
1
2
3SNode(Object item) {
this.item = item;
}设置方法:设置 Node 对象的 next 字段,此处对 CAS 进行了优化,提升了 CAS 的效率
1
2
3
4boolean casNext(SNode cmp, SNode val) {
//【优化:cmp == next】,可以提升一部分性能。 cmp == next 不相等,就没必要走 cas指令。
return cmp == next && UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}匹配方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17boolean tryMatch(SNode s) {
// 当前 node 尚未与任何节点发生过匹配,CAS 设置 match 字段为 s 节点,表示当前 node 已经被匹配
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
// 当前 node 如果自旋结束,会 park 阻塞,阻塞前将 node 对应的 Thread 保留到 waiter 字段
// 获取当前 node 对应的阻塞线程
Thread w = waiter;
// 条件成立说明 node 对应的 Thread 正在阻塞
if (w != null) {
waiter = null;
// 使用 unpark 方式唤醒线程
LockSupport.unpark(w);
}
return true;
}
// 匹配成功返回 true
return match == s;
}取消方法:
1
2
3
4
5
6
7
8
9// 取消节点的方法
void tryCancel() {
// match 字段指向自己,表示这个 node 是取消状态,取消状态的 node,最终会被强制移除出栈
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
TransferStack 类成员方法:
snode():填充节点方法
1
2
3
4
5
6
7
8static SNode snode(SNode s, Object e, SNode next, int mode) {
// 引用指向空时,snode 方法会创建一个 SNode 对象
if (s == null) s = new SNode(e);
// 填充数据
s.mode = mode;
s.next = next;
return s;
}transfer():核心方法,请求匹配出栈,不匹配阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85E transfer(E e, boolean timed, long nanos) {
// 包装当前线程的 node
SNode s = null;
// 根据元素判断当前的请求类型
int mode = (e == null) ? REQUEST : DATA;
// 自旋
for (;;) {
// 获取栈顶指针
SNode h = head;
// 【CASE1】:当前栈为空或者栈顶 node 模式与当前请求模式一致无法匹配,做入栈操作
if (h == null || h.mode == mode) {
// 当前请求是支持超时的,但是 nanos <= 0 说明这个请求不支持 “阻塞等待”
if (timed && nanos <= 0) {
// 栈顶元素是取消状态
if (h != null && h.isCancelled())
// 栈顶出栈,设置新的栈顶
casHead(h, h.next);
else
// 表示【匹配失败】
return null;
// 入栈
} else if (casHead(h, s = snode(s, e, h, mode))) {
// 等待被匹配的逻辑,正常情况返回匹配的节点;取消情况返回当前节点,就是 s
SNode m = awaitFulfill(s, timed, nanos);
// 说明当前 node 是【取消状态】
if (m == s) {
// 将取消节点出栈
clean(s);
return null;
}
// 执行到这说明【匹配成功】了
// 栈顶有节点并且 匹配节点还未出栈,需要协助出栈
if ((h = head) != null && h.next == s)
casHead(h, s.next);
// 当前 node 模式为 REQUEST 类型,返回匹配节点的 m.item 数据域
// 当前 node 模式为 DATA 类型:返回 node.item 数据域,当前请求提交的数据 e
return (E) ((mode == REQUEST) ? m.item : s.item);
}
// 【CASE2】:逻辑到这说明请求模式不一致,如果栈顶不是 FULFILLING 说明没被其他节点匹配,【当前可以匹配】
} else if (!isFulfilling(h.mode)) {
// 头节点是取消节点,match 指向自己,协助出栈
if (h.isCancelled())
casHead(h, h.next);
// 入栈当前请求的节点
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) {
// m 是 s 的匹配的节点
SNode m = s.next;
// m 节点在 awaitFulfill 方法中被中断,clean 了自己
if (m == null) {
// 清空栈
casHead(s, null);
s = null;
// 返回到外层自旋中
break;
}
// 获取匹配节点的下一个节点
SNode mn = m.next;
// 尝试匹配,【匹配成功】,则将 fulfilling 和 m 一起出栈,并且唤醒被匹配的节点的线程
if (m.tryMatch(s)) {
casHead(s, mn);
return (E) ((mode == REQUEST) ? m.item : s.item);
} else
// 匹配失败,出栈 m
s.casNext(m, mn);
}
}
// 【CASE3】:栈顶模式为 FULFILLING 模式,表示【栈顶和栈顶下面的节点正在发生匹配】,当前请求需要做协助工作
} else {
// h 表示的是 fulfilling 节点,m 表示 fulfilling 匹配的节点
SNode m = h.next;
if (m == null)
// 清空栈
casHead(h, null);
else {
SNode mn = m.next;
// m 和 h 匹配,唤醒 m 中的线程
if (m.tryMatch(h))
casHead(h, mn);
else
h.casNext(m, mn);
}
}
}
}awaitFulfill():阻塞当前线程等待被匹配,返回匹配的节点,或者被取消的节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 等待的截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 当前线程
Thread w = Thread.currentThread();
// 表示当前请求线程在下面的 for(;;) 自旋检查的次数
int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
// 自旋检查逻辑:是否匹配、是否超时、是否被中断
for (;;) {
// 当前线程收到中断信号,需要设置 node 状态为取消状态
if (w.isInterrupted())
s.tryCancel();
// 获取与当前 s 匹配的节点
SNode m = s.match;
if (m != null)
// 可能是正常的匹配的,也可能是取消的
return m;
// 执行了超时限制就判断是否超时
if (timed) {
nanos = deadline - System.nanoTime();
// 【超时了,取消节点】
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
// 说明当前线程还可以进行自旋检查
if (spins > 0)
// 自旋一次 递减 1
spins = shouldSpin(s) ? (spins - 1) : 0;
// 说明没有自旋次数了
else if (s.waiter == null)
//【把当前 node 对应的 Thread 保存到 node.waiter 字段中,要阻塞了】
s.waiter = w;
// 没有超时限制直接阻塞
else if (!timed)
LockSupport.park(this);
// nanos > 1000 纳秒的情况下,才允许挂起当前线程
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}1
2
3
4
5
6
7
8boolean shouldSpin(SNode s) {
// 获取栈顶
SNode h = head;
// 条件一成立说明当前 s 就是栈顶,允许自旋检查
// 条件二成立说明当前 s 节点自旋检查期间,又来了一个与当前 s 节点匹配的请求,双双出栈后条件会成立
// 条件三成立前提当前 s 不是栈顶元素,并且当前栈顶正在匹配中,这种状态栈顶下面的元素,都允许自旋检查
return (h == s || h == null || isFulfilling(h.mode));
}clear():指定节点出栈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25void clean(SNode s) {
// 清空数据域和关联线程
s.item = null;
s.waiter = null;
// 获取取消节点的下一个节点
SNode past = s.next;
// 判断后继节点是不是取消节点,是就更新 past
if (past != null && past.isCancelled())
past = past.next;
SNode p;
// 从栈顶开始向下检查,【将栈顶开始向下的 取消状态 的节点全部清理出去】,直到碰到 past 或者不是取消状态为止
while ((p = head) != null && p != past && p.isCancelled())
// 修改的是内存地址对应的值,p 指向该内存地址所以数据一直在变化
casHead(p, p.next);
// 说明中间遇到了不是取消状态的节点,继续迭代下去
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
公平实现
TransferQueue 是公平的同步队列,采用 FIFO 的队列实现,请求节点与队尾模式不同,需要与队头发生匹配
TransferQueue 类成员变量:
指向队列的 dummy 节点:
1
transient volatile QNode head;
指向队列的尾节点:
1
transient volatile QNode tail;
被清理节点的前驱节点:
1
transient volatile QNode cleanMe;
入队操作是两步完成的,第一步是 t.next = newNode,第二步是 tail = newNode,所以队尾节点出队,是一种非常特殊的情况
TransferQueue 内部类:
QNode:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32static final class QNode {
// 指向当前节点的下一个节点
volatile QNode next;
// 数据域,Node 代表的是 DATA 类型 item 表示数据,否则 Node 代表的 REQUEST 类型,item == null
volatile Object item;
// 假设当前 node 对应的线程自旋期间未被匹配成功,那么 node 对应的线程需要挂起,
// 挂起前 waiter 保存对应的线程引用,方便匹配成功后被唤醒。
volatile Thread waiter;
// true 当前 Node 是一个 DATA 类型,false 表示当前 Node 是一个 REQUEST 类型
final boolean isData;
// 构建方法
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 尝试取消当前 node,取消状态的 node 的 item 域指向自己
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
// 判断当前 node 是否为取消状态
boolean isCancelled() {
return item == this;
}
// 判断当前节点是否 “不在” 队列内,当 next 指向自己时,说明节点已经出队。
boolean isOffList() {
return next == this;
}
}
TransferQueue 类成员方法:
设置头尾节点:
1
2
3
4
5
6
7
8
9
10
11void advanceHead(QNode h, QNode nh) {
// 设置头指针指向新的节点,
if (h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
// 老的头节点出队
h.next = h;
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)
// 更新队尾节点为新的队尾
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}transfer():核心方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78E transfer(E e, boolean timed, long nanos) {
// s 指向当前请求对应的 node
QNode s = null;
// 是否是 DATA 类型的请求
boolean isData = (e != null);
// 自旋
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null)
continue;
// head 和 tail 同时指向 dummy 节点,说明是空队列
// 队尾节点与当前请求类型是一致的情况,说明阻塞队列中都无法匹配,
if (h == t || t.isData == isData) {
// 获取队尾 t 的 next 节点
QNode tn = t.next;
// 多线程环境中其他线程可能修改尾节点
if (t != tail)
continue;
// 已经有线程入队了,更新 tail
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 允许超时,超时时间小于 0,这种方法不支持阻塞等待
if (timed && nanos <= 0)
return null;
// 创建 node 的逻辑
if (s == null)
s = new QNode(e, isData);
// 将 node 添加到队尾
if (!t.casNext(null, s))
continue;
// 更新队尾指针
advanceTail(t, s);
// 当前节点 等待匹配....
Object x = awaitFulfill(s, e, timed, nanos);
// 说明【当前 node 状态为 取消状态】,需要做出队逻辑
if (x == s) {
clean(t, s);
return null;
}
// 说明当前 node 仍然在队列内,匹配成功,需要做出队逻辑
if (!s.isOffList()) {
// t 是当前 s 节点的前驱节点,判断 t 是不是头节点,是就更新 dummy 节点为 s 节点
advanceHead(t, s);
// s 节点已经出队,所以需要把它的 item 域设置为它自己,表示它是个取消状态
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null) ? (E)x : e;
// 队尾节点与当前请求节点【互补匹配】
} else {
// h.next 节点,【请求节点与队尾模式不同,需要与队头发生匹配】,TransferQueue 是一个【公平模式】
QNode m = h.next;
// 并发导致其他线程修改了队尾节点,或者已经把 head.next 匹配走了
if (t != tail || m == null || h != head)
continue;
// 获取匹配节点的数据域保存到 x
Object x = m.item;
// 判断是否匹配成功
if (isData == (x != null) ||
x == m ||
!m.casItem(x, e)) {
advanceHead(h, m);
continue;
}
// 【匹配完成】,将头节点出队,让这个新的头结点成为 dummy 节点
advanceHead(h, m);
// 唤醒该匹配节点的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;
}
}
}awaitFulfill():阻塞当前线程等待被匹配
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
// 表示等待截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自选检查的次数
int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
// 被打断就取消节点
if (w.isInterrupted())
s.tryCancel(e);
// 获取当前 Node 数据域
Object x = s.item;
// 当前请求为 DATA 模式时:e 请求带来的数据
// s.item 修改为 this,说明当前 QNode 对应的线程 取消状态
// s.item 修改为 null 表示已经有匹配节点了,并且匹配节点拿走了 item 数据
// 当前请求为 REQUEST 模式时:e == null
// s.item 修改为 this,说明当前 QNode 对应的线程 取消状态
// s.item != null 且 item != this 表示当前 REQUEST 类型的 Node 已经匹配到 DATA 了
if (x != e)
return x;
// 超时检查
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 自旋次数减一
if (spins > 0)
--spins;
// 没有自旋次数了,把当前线程封装进去 waiter
else if (s.waiter == null)
s.waiter = w;
// 阻塞
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}