CountDownLatch的使用
CountDownLatch
基本使用
CountDownLatch:计数器,用来进行线程同步协作,等待所有线程完成
构造器:
public CountDownLatch(int count)
:初始化唤醒需要的 down 几步
常用 API:
public void await()
:让当前线程等待,必须 down 完初始化的数字才可以被唤醒,否则进入无限等待public void countDown()
:计数器进行减 1(down 1)
应用:
应用之同步等待多线程准备完毕
1 |
|
中间输出
[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]
最后输出
[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%)]
游戏开始…
应用:
用来进行线程间的同步协作,等待所有线程完成倒计时
其中构造参数用来初始化等待计数值,await () 用来等待计数归零,countDown () 用来让计数减一
应用之同步等待多个远程调用结束
1 |
|
rest 远程调用
1 |
|
执行结果
1 |
|
实现原理
阻塞等待:
线程调用 await () 等待其他线程完成任务:支持打断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AbstractQueuedSynchronizer#acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判断线程是否被打断,抛出打断异常
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取共享锁,条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源
// 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync#tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}线程进入 AbstractQueuedSynchronizer#doAcquireSharedInterruptibly 函数阻塞挂起,等待 latch 变为 0:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 前驱节点时头节点就可以尝试获取锁
if (p == head) {
// 再次尝试获取锁,获取成功返回 1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 获取锁成功,设置当前节点为 head 节点,并且向后传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 阻塞在这里
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
// 阻塞线程被中断后抛出异常,进入取消节点的逻辑
if (failed)
cancelAcquire(node);
}
}获取共享锁成功,进入唤醒阻塞队列中与头节点相连的 SHARED 模式的节点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 将当前节点设置为新的 head 节点,前驱节点和持有线程置为 null
setHead(node);
// propagate = 1,条件一成立
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
// 获取当前节点的后继节点
Node s = node.next;
// 当前节点是尾节点时 next 为 null,或者后继节点是 SHARED 共享模式
if (s == null || s.isShared())
// 唤醒所有的等待共享锁的节点
doReleaseShared();
}
}
计数减一:
线程进入 countDown () 完成计数器减一(释放锁)的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
// 尝试释放共享锁
if (tryReleaseShared(arg)) {
// 释放锁成功开始唤醒阻塞节点
doReleaseShared();
return true;
}
return false;
}更新 state 值,每调用一次,state 值减一,当 state -1 正好为 0 时,返回 true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
// 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false
if (c == 0)
return false;
// 计数器减一
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 计数器为 0 时返回 true
return nextc == 0;
}
}state = 0 时,当前线程需要执行唤醒阻塞节点的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doReleaseShared() {
for (;;) {
Node h = head;
// 判断队列是否是空队列
if (h != null && h != tail) {
int ws = h.waitStatus;
// 头节点的状态为 signal,说明后继节点没有被唤醒过
if (ws == Node.SIGNAL) {
// cas 设置头节点的状态为 0,设置失败继续自旋
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,
// 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
if (h == head)
break;
}
}
CyclicBarrier
基本使用
CyclicBarrier: 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要 “同步” 的时刻调用 await () 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.
常用方法:
- ```
public CyclicBarrier(int parties, Runnable barrierAction)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
:用于在线程到达屏障 parties 时,执行 barrierAction
- parties:代表多少个线程到达屏障开始触发线程任务
- barrierAction:线程任务
- `public int await()`:线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障
与 CountDownLatch 的区别:CyclicBarrier 是可以重用的
```JAVA
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行
new Thread(()->{
System.out.println("线程1开始.."+new Date());
try {
cb.await(); // 当个数不足时,等待
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程1继续向下运行..."+new Date());
}).start();
new Thread(()->{
System.out.println("线程2开始.."+new Date());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
try {
cb.await(); // 2 秒后,线程个数够2,继续运行
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("线程2继续向下运行..."+new Date());
}).start();
JAVA
package hm.Semaphore;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
System.out.println("task1 task2 finish...");
});
for (int i = 0; i < 3; i++) { // 循环重用
service.submit(() -> {
System.out.println("task1 begin...");
try {
Thread.sleep(1000);
barrier.await(); // 2 - 1 = 1
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
service.submit(() -> {
System.out.println("task2 begin...");
try {
Thread.sleep(2000);
barrier.await(); // 1 - 1 = 0
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
JAVA
task1 begin...
task2 begin...
task1 task2 finish...
task1 begin...
task2 begin...
task1 task2 finish...
task1 begin...
task2 begin...
task1 task2 finish...
Process finished with exit code 0