CountDownLatch

基本使用

CountDownLatch:计数器,用来进行线程同步协作,等待所有线程完成

构造器:

  • public CountDownLatch(int count):初始化唤醒需要的 down 几步

常用 API:

  • public void await() :让当前线程等待,必须 down 完初始化的数字才可以被唤醒,否则进入无限等待
  • public void countDown():计数器进行减 1(down 1)

应用:

应用之同步等待多线程准备完毕

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// LOL 10人进入游戏倒计时
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
String[] all = new String[10];
Random random = new Random();

for (int j = 0; j < 10; j++) {
int finalJ = j;//常量
service.submit(() -> {
for (int i = 0; i <= 100; i++) {
Thread.sleep(random.nextInt(100)); //随机休眠
all[finalJ] = i + "%";
System.out.print("\r" + Arrays.toString(all)); // \r代表覆盖
}
latch.countDown();
});
}
latch.await();
System.out.println("\n游戏开始");
service.shutdown();
}

中间输出

[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]

最后输出

[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%)]
游戏开始…

应用:

用来进行线程间的同步协作,等待所有线程完成倒计时

其中构造参数用来初始化等待计数值,await () 用来等待计数归零,countDown () 用来让计数减一

应用之同步等待多个远程调用结束

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

@RestController
public class TestCountDownlatchController {
@GetMapping("/order/{id}")
public Map<String, Object> order(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("total", "2300.00");
sleep(2000);
return map;
}

@GetMapping("/product/{id}")
public Map<String, Object> product(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
if (id == 1) {
map.put("name", "小爱音箱");
map.put("price", 300);
} else if (id == 2) {
map.put("name", "小米手机");
map.put("price", 2000);
}
map.put("id", id);
sleep(1000);
return map;
}

@GetMapping("/logistics/{id}")
public Map<String, Object> logistics(@PathVariable int id) {
HashMap<String, Object> map = new HashMap<>();
map.put("id", id);
map.put("name", "中通快递");
sleep(2500);
return map;
}

private void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

rest 远程调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

RestTemplate restTemplate = new RestTemplate();
log.debug("begin");
ExecutorService service = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(4);
Future<Map<String,Object>> f1 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f2 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);
return r;
});
Future<Map<String, Object>> f3 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);
return r;
});
Future<Map<String, Object>> f4 = service.submit(() -> {
Map<String, Object> r =
restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);
return r;
});
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
System.out.println(f4.get());
log.debug("执行完毕");
service.shutdown();

执行结果

1
2
3
4
5
6
7

19:51:39.711 c.TestCountDownLatch [main] - begin
{total=2300.00, id=1}
{price=300, name=小爱音箱, id=1}
{price=2000, name=小米手机, id=2}
{name=中通快递, id=1}
19:51:42.407 c.TestCountDownLatch [main] - 执行完毕

实现原理

阻塞等待:

  • 线程调用 await () 等待其他线程完成任务:支持打断

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
    }
    // AbstractQueuedSynchronizer#acquireSharedInterruptibly
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    // 判断线程是否被打断,抛出打断异常
    if (Thread.interrupted())
    throw new InterruptedException();
    // 尝试获取共享锁,条件成立说明 state > 0,此时线程入队阻塞等待,等待其他线程获取共享资源
    // 条件不成立说明 state = 0,此时不需要阻塞线程,直接结束函数调用
    if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
    }
    // CountDownLatch.Sync#tryAcquireShared
    protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
    }
  • 线程进入 AbstractQueuedSynchronizer#doAcquireSharedInterruptibly 函数阻塞挂起,等待 latch 变为 0:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31

    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    // 将调用latch.await()方法的线程 包装成 SHARED 类型的 node 加入到 AQS 的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
    for (;;) {
    // 获取当前节点的前驱节点
    final Node p = node.predecessor();
    // 前驱节点时头节点就可以尝试获取锁
    if (p == head) {
    // 再次尝试获取锁,获取成功返回 1
    int r = tryAcquireShared(arg);
    if (r >= 0) {
    // 获取锁成功,设置当前节点为 head 节点,并且向后传播
    setHeadAndPropagate(node, r);
    p.next = null; // help GC
    failed = false;
    return;
    }
    }
    // 阻塞在这里
    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    throw new InterruptedException();
    }
    } finally {
    // 阻塞线程被中断后抛出异常,进入取消节点的逻辑
    if (failed)
    cancelAcquire(node);
    }
    }
  • 获取共享锁成功,进入唤醒阻塞队列中与头节点相连的 SHARED 模式的节点:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    // 将当前节点设置为新的 head 节点,前驱节点和持有线程置为 null
    setHead(node);
    // propagate = 1,条件一成立
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
    // 获取当前节点的后继节点
    Node s = node.next;
    // 当前节点是尾节点时 next 为 null,或者后继节点是 SHARED 共享模式
    if (s == null || s.isShared())
    // 唤醒所有的等待共享锁的节点
    doReleaseShared();
    }
    }

计数减一:

  • 线程进入 countDown () 完成计数器减一(释放锁)的操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    public void countDown() {
    sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
    // 尝试释放共享锁
    if (tryReleaseShared(arg)) {
    // 释放锁成功开始唤醒阻塞节点
    doReleaseShared();
    return true;
    }
    return false;
    }
  • 更新 state 值,每调用一次,state 值减一,当 state -1 正好为 0 时,返回 true

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    protected boolean tryReleaseShared(int releases) {
    for (;;) {
    int c = getState();
    // 条件成立说明前面【已经有线程触发唤醒操作】了,这里返回 false
    if (c == 0)
    return false;
    // 计数器减一
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
    // 计数器为 0 时返回 true
    return nextc == 0;
    }
    }
  • state = 0 时,当前线程需要执行唤醒阻塞节点的任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25

    private void doReleaseShared() {
    for (;;) {
    Node h = head;
    // 判断队列是否是空队列
    if (h != null && h != tail) {
    int ws = h.waitStatus;
    // 头节点的状态为 signal,说明后继节点没有被唤醒过
    if (ws == Node.SIGNAL) {
    // cas 设置头节点的状态为 0,设置失败继续自旋
    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
    continue;
    // 唤醒后继节点
    unparkSuccessor(h);
    }
    // 如果有其他线程已经设置了头节点的状态,重新设置为 PROPAGATE 传播属性
    else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
    continue;
    }
    // 条件不成立说明被唤醒的节点非常积极,直接将自己设置为了新的head,
    // 此时唤醒它的节点(前驱)执行 h == head 不成立,所以不会跳出循环,会继续唤醒新的 head 节点的后继节点
    if (h == head)
    break;
    }
    }

CyclicBarrier

基本使用

CyclicBarrier: 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要 “同步” 的时刻调用 await () 方法进行等待,当等待的线程数满足『计数个数』时,继续执行.

常用方法:

  • ```
    public CyclicBarrier(int parties, Runnable barrierAction)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91

    :用于在线程到达屏障 parties 时,执行 barrierAction

    - parties:代表多少个线程到达屏障开始触发线程任务
    - barrierAction:线程任务

    - `public int await()`:线程调用 await 方法通知 CyclicBarrier 本线程已经到达屏障

    与 CountDownLatch 的区别:CyclicBarrier 是可以重用的

    ```JAVA

    CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行

    new Thread(()->{
    System.out.println("线程1开始.."+new Date());
    try {
    cb.await(); // 当个数不足时,等待
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("线程1继续向下运行..."+new Date());
    }).start();

    new Thread(()->{
    System.out.println("线程2开始.."+new Date());
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    }
    try {
    cb.await(); // 2 秒后,线程个数够2,继续运行
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    System.out.println("线程2继续向下运行..."+new Date());
    }).start();
    JAVA
    package hm.Semaphore;

    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class CyclicBarrierTest {

    public static void main(String[] args) {
    ExecutorService service = Executors.newFixedThreadPool(2);
    CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    System.out.println("task1 task2 finish...");
    });

    for (int i = 0; i < 3; i++) { // 循环重用
    service.submit(() -> {
    System.out.println("task1 begin...");
    try {
    Thread.sleep(1000);
    barrier.await(); // 2 - 1 = 1
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    });

    service.submit(() -> {
    System.out.println("task2 begin...");
    try {
    Thread.sleep(2000);
    barrier.await(); // 1 - 1 = 0
    } catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();
    }
    });
    }
    service.shutdown();
    }
    }
    JAVA
    task1 begin...
    task2 begin...
    task1 task2 finish...
    task1 begin...
    task2 begin...
    task1 task2 finish...
    task1 begin...
    task2 begin...
    task1 task2 finish...

    Process finished with exit code 0