image-20220418150621791

JUC并发编程:

2w字 + 40张图带你参透并发编程! - 知乎 (zhihu.com)

1、什么是JUC

1.1、JUC简介

JUC 就是 java.util .concurrent 工具包的简称。这是一个处理线程的工具包,JDK 1.5 开始出现的

1.2 进程与线程

进程是计算机中的程序关于某数据集合上的一次运动活动,是系统进行资源分配和调度的基本单位,是操作系统进行资源分配和调度的基本单位

线程(thread) 是操作系统能够进行运算调度的最小单位。它被包含在进程之 中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流, 一个进程中可以并发多个线程,每条线程并行执行不同的任务。

总结来说:

进程:指在系统中正在运行的一个应用程序;程序一旦运行就是进程;

进程— —资源分配的最小单位。

线程:系统分配处理器时间资源的基本单元,或者说进程之内独立执行的一个 单元执行流。

线程——程序执行的最小单位。

你也会开始明白进程与线程最大的区别在于上下文切换过程中,线程不用切换虚拟内存,因为同一个进程内的线程都是共享虚拟内存空间的,线程就单这一点不用切换,就相比进程上下文切换的性能开销减少了很多。由于虚拟内存与物理内存的映射关系需要查询页表,页表的查询是很慢的过程,线程间的相互切换是在页表中,而进程切换需要换表,因此会把常用的地址映射关系缓存在 TLB 里的,这样便可以提高页表的查询速度,如果发生了进程切换,那 TLB 缓存的地址映射关系就会失效,缓存失效就意味着命中率降低,于是虚拟地址转为物理地址这一过程就会很慢。

1.3 线程的状态

1.3.1 线程状态的枚举类

image-20220206211956886

当线程被创建并启动以后,它既不是一启动就进入执行状态,也不是一直处于执行状态。在线程的生命周期中,它要经过新建(new)、就绪(runable)、运行(Runing)、阻塞(Blocked)和死亡(Dead)5种状态,

尤其是当线程启动以后,它不可能一直”霸占”着 CPU 独自 运行,所以 CPU 需要在多条线程之间切换,于是线程状态也会多次在运行、阻塞之间切换

新建状态(NEW)

当程序使用 new 关键字创建了一个线程之后,该线程就处于新建状态,此时仅由 JVM 为其分配内存,并初始化其成员变量的

就绪状态(RUNNABLE):

当线程对象调用了 start()方法之后,该线程处于就绪状态。Java 虚拟机会为其创建方法调用栈和程序计数器,等待调度运行。

运行状态(RUNNING):

如果处于就绪状态的线程获得了 CPU,开始执行 run()方法的线程执行体,则该线程处于运行状 态。

阻塞状态(BLOCKED):

阻塞状态是指线程因为某种原因放弃了 cpu 使用权,也即让出了 cpu timeslice,暂时停止运行。 直到线程进入可运行(runnable)状态,才有机会再次获得 cpu timeslice 转到运行(running)状 态。

阻塞的情况分三种:

等待阻塞(o.wait->等待对列):

运行(running)的线程执行 o.wait()方法,JVM 会把该线程放入等待队列(waitting queue) 中。

同步阻塞(lock->锁池)

运行(running)的线程在获取对象的同步锁时,若该同步锁被别的线程占用,则 JVM 会把该线 程放入锁池(lock pool)中。

其他阻塞(sleep/join)

运行(running)的线程执行 Thread.sleep(long ms)或 t.join()方法,或者发出了 I/O 请求时, JVM 会把该线程置为阻塞状态。当 sleep()状态超时、join()等待线程终止或者超时、或者 I/O 处理完毕时,线程重新转入可运行(runnable)状态

wait/sleep 的区别

  • sleep是Thread的静态方法,wait是Object的方法,任何对象实例都可以进行调用
  • sleep不会释放锁,他也不需要占用锁。wait会释放锁,但是调用它发的前提是当前线程占有锁(即代码要在synchronized)
  • 它们都可以被interrupted方法中断

虚拟内存与地址空间映射关系

虚拟内存是操作系统为每个进程提供的一种抽象,每个进程都有属于自己的、私有的、地址连续的虚拟内存,当然我们知道最终进程的数据及代码必然要放到物理内存上,那么必须有某种机制能记住虚拟地址空间中的某个数据被放到了哪个物理内存地址上,这就是所谓的地址空间映射,也就是虚拟内存地址与物理内存地址的映射关系,那么操作系统是如何记住这种映射关系的呢,答案就是页表,页表中记录了虚拟内存地址到物理内存地址的映射关系。有了页表就可以将虚拟地址转换为物理内存地址了,这种机制就是虚拟内存。

上下文切换:

进程切换

切换虚拟地址空间,切换内核栈和硬件上下文

线程切换

切换内核栈和硬件上下文

进程与线程的区别?

(1)进程有自己的独立地址空间,线程没有

(2)进程是资源分配的最小单位,线程是CPU调度的最小单位

(3)进程和线程通信方式不同(线程之间的通信比较方便。同一进程下的线程共享数据(比如全局变量,静态变量),通过这些数据来通信不仅快捷而且方便,当然如何处理好这些访问的同步与互斥正是编写多线程程序的难点。而进程之间的通信只能通过进程通信的方式进行。)

(4)进程上下文切换开销大,线程开销小

(5)一个进程挂掉了不会影响其他进程,而线程挂掉了会影响其他线程

(6)对进程操作一般开销都比较大,对线程开销就小了

为什么进程上下文切换比线程上下文切换代价高?

进程切换分两步:

1.切换页目录以使用新的地址空间

2.切换内核栈和硬件上下文

对于linux来说,线程和进程的最大区别就在于地址空间,对于线程切换,第1步是不需要做的,第2是进程和线程切换都要做的。

切换的性能消耗:

1、线程上下文切换和进程上下问切换一个最主要的区别是线程的切换虚拟内存空间依然是相同的,但是进程切换是不同的。这两种上下文切换的处理都是通过操作系统内核来完成的。内核的这种切换过程伴随的最显著的性能损耗是将寄存器中的内容切换出。

2、另外一个隐藏的损耗是上下文的切换会扰乱处理器的缓存机制。简单的说,一旦去切换上下文,处理器中所有已经缓存的内存地址一瞬间都作废了。还有一个显著的区别是当你改变虚拟内存空间的时候,处理的页表缓冲(processor’s Translation Lookaside Buffer (TLB))被全部刷新,这将导致内存的访问在一段时间内相当的低效。但是在线程的切换中,不会出现这个问题。

img

并发、并行、串行

串行:表示所有任务都一一按先后顺序进行,执行任务必须要等待上一个执行完,进行下一个任务执行。

并行:单位时间内,多个任务同时执⾏。

并发:同⼀时间段,多个任务都在执⾏ (单位时间内不⼀定同时执⾏)

image-20220211220142561

Java中的18把锁:

https://mp.weixin.qq.com/s/cVulL86Fg4qGJKNOnLBzTQ

乐观锁和悲观锁

悲观锁

对应于生活中悲观的人,对于悲观的人总是想着坏的方向发展

image-20220402000609889

在Java语言中,synchronized 和 ReetrantLock等就是典型的悲观锁。还有一些使用了synchronized关键字的容器类如HashTable等也是悲观锁的应用。

乐观锁:

​ 认为拿数据是安全的,每次都会进行数据的提取,乐观锁在更新数据的时候不会进行上锁,在更新判断的时候是否有其他的线程去更新这个数据

image-20220402004445482

乐观锁可以使用版本号机制CAS算法实现。在 Java 语言中 java.util.concurrent.atomic包下的原子类就是使用CAS 乐观锁实现的。

两种锁的使用场景

悲观锁和乐观锁没有孰优孰劣,有各自的应用场景

乐观锁应用在写比较少的情况下,因为不用上锁、释放锁,省去了锁的开销,从而提升了吞吐量。

如果是写多读少的场景,即冲突比较严重,线程间竞争激励,使用乐观锁就是导致线程不断进行重试,这样可能还降低了性能,这种场景下使用悲观锁就比较合适。

synchronized 关键字的了解

synchronized 关键字解决的是多个资源访问的资源的同步性,可以保证被他修饰的方法或则关键字或者代码块在任意时刻只能有一个线程执行

独占锁和共享锁

独占锁

独占锁是指锁一次只能被一个线程所持有。如果一个线程对数据加上排他锁后,那么其他线程不能再对该数据加任何类型的锁。获得独占锁的线程即能读数据又能修改数据

image-20220402010334748

JDK中的synchronizedjava.util.concurrent(JUC)包中Lock的实现类就是独占锁。

共享锁

共享锁是指锁可被多个线程所持有。如果一个线程对数据加上共享锁后,那么其他线程只能对数据再加共享锁,不能加独占锁。获得共享锁的线程只能读数据,不能修改数据。

image-20220402010457979

在 JDK 中 ReentrantReadWriteLock 就是一种共享锁。

互斥锁和读写锁

互斥锁是独占锁的一种常规实现,是指某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。

image-20220402010733824互斥锁一次只能一个线程拥有互斥锁,其他线程只有等待。

读写锁

读写锁是共享锁的一种具体实现。读写锁管理一组锁,一个是只读的锁,一个是写锁。

读锁可以在没有写锁的时候被多个线程同时持有,而写锁是独占的。写锁的优先级要高于读锁,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。

读写锁相比于互斥锁并发程度更高,每次只有一个写线程,但是同时可以有多个线程并发读。

image-20220402010822724

公平锁和非公平锁

公平锁

公平锁是指多个线程按照申请锁的顺序来获取锁,这里类似排队买票,先来的人先买,后来的人在队尾排着,这是公平的。

image-20220402010855628

1
2
3
4
/**
* 创建一个可重入锁,true 表示公平锁,false 表示非公平锁。默认非公平锁
*/
Lock lock = new ReentrantLock(true);

非公平锁

非公平锁是指多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并发环境下,有可能造成优先级翻转,或者饥饿的状态(某个线程一直得不到锁)。

image-20220402010959009

在 java 中 synchronized 关键字是非公平锁,ReentrantLock默认也是非公平锁。

1
2
3
4
/**
* 创建一个可重入锁,true 表示公平锁,false 表示非公平锁。默认非公平锁
*/
Lock lock = new ReentrantLock(false);

可重入锁:

可重入锁又称之为递归锁,是指同一个线程在外层方法获取了锁,在进入内层方法会自动获取锁。

image-20220402011057644

对于Java ReentrantLock而言, 他的名字就可以看出是一个可重入锁。对于Synchronized而言,也是一个可重入锁。

敲黑板:可重入锁的一个好处是可一定程度避免死锁。

1
2
3
4
5
6
7
8
public synchronized void mehtodA() throws Exception{
// Do some magic tings
mehtodB();
}

public synchronized void mehtodB() throws Exception{
// Do some magic tings
}

上面的代码中 methodA 调用 methodB,如果一个线程调用methodA 已经获取了锁再去调用 methodB 就不需要再次获取锁了,这就是可重入锁的特性。如果不是可重入锁的话,mehtodB 可能不会被当前线程执行,可能造成死锁。

自旋锁

自旋锁是指线程在没有获得锁时不是被直接挂起,而是执行一个忙循环,这个忙循环就是所谓的自旋。

image-20220402011343846

Collection:

ConcurrentHashMap: 线程安全的 HashMap

CopyOnWriteArrayList: 线程安全的 List,在读多写少的场合性能非常好,远远好于 Vector. ConcurrentLinkedQueue: 高效的并发队列,使用链表实现。可以看做一个线程安全的LinkedList,这是一个非阻塞队列。

BlockingQueue: 这是一个接口,JDK 内部通过链表、数组等方式实现了这个接口。表示阻塞队 列,非常适合用于作为数据共享的通道。

ConcurrentSkipListMap: 跳表的实现。这是一个 Map,使用跳表的数据结构进行快速查找。

ConcurrentHashMap

因为HashMap不是线程安全的,在并发场景下,保证可行的方式Collections.synchronizedMap()方法来包装我们的HashMap.但这是通过一个全局的锁来同步的不同线程之间的并发访问,因此会带来性能问题

采用“锁分段”机制:

而在写操作时 通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

jdk1.8 采用的CAS,可以理解为无锁,相较于其他的,不用进行阻塞

CopyOnWriteArrayList

1
2
3
public class CopyOnWriteArrayList<E>
extends Object
implements List<E>, RandomAccess, Cloneable, Serializable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package JUC.conllections;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class TestCopyOnWriteArrayList {
public static void main(String[] args) {
HelloThread ht = new HelloThread();
for(int i=0;i<1;i++){
new Thread(ht).start();
}
}

// * CopyOnWriteArrayList/CopyOnWriteArraySet : “写入并复制”
// * 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常的大。并发迭代操作多时可以选择。
static class HelloThread implements Runnable{
// 单纯加同步锁不能,在进行添加元素的时候,不能进行操作
// private static List<String> list = Collections.synchronizedList(new ArrayList<>());
private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
static{
list.add("AA");
list.add("BB");
list.add("CC");
}
@Override
public void run() {
Iterator<String> it = list.iterator();
while (it.hasNext()){
System.out.println(it.next());
list.add("AA");
}
}
}

}

简介

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此对于每 次读取都进行加锁其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读取操 作是安全的。

这和我们之前ReentrantReadWriteLock 读写锁的思想非常类似,也就是读读共 享、写写互斥、读写互斥、写读互斥。JDK 中提供了 CopyOnWriteArrayList 类比相比于在读写锁的 思想又更进一步。为了将读取的性能发挥到极致, CopyOnWriteArrayList 读取是完全不用加锁的, 并且更厉害的是:写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。这样一来,读操 作的性能就会大幅度提升。那它是怎么做的呢?

CopyOnWriteArrayList 是如何做到的?

CopyOnWriteArrayList 类的所有可变操作(add,set 等等)都是通过创建底层数组的新副本来实现 的。当 List 需要被修改的时候,我并不修改原有内容,而是对原有数据进行一次复制,将修改的内容写 入副本。写完之后,再将修改完的副本替换原来的数据,这样就可以保证写操作不会影响读操作了。

​ 从 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是满足 CopyOnWrite 的 ArrayList,所谓 CopyOnWrite 也就是说:

在计算机,如果你想要对一块内存进行修改时,我们不在原 有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后呢,就将指向原来 内存指针指向新的内存,原来的内存就可以被回收掉了。

CopyOnWriteArrayList 读取和写入源码简单分析

CopyOnWriteArrayList 读取操作的实现

读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

1
2
3
4
5
6
7
8
9
10
11
12
/*The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}

CopyOnWriteArrayList 写入操作的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*Apends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//加锁
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//拷贝新数组
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();//释放锁
}
}

synchronized:

synchronized 是 Java 中的关键字,是一种同步锁

  1. 修饰一个代码块,被修饰的代码块称为同步语句块,其作用的范围是大括号{} 括起来的代码,作用的对象是调用这个代码块的对象;
  2. 修饰一个方法,被修饰的方法称为同步方法,其作用的范围是整个方法,作用 的对象是调用这个方法的对象;

synchronized定义方法,但是synchronized并不属于方法的定义。因此synchroized关键字不能够被继承,

如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。

如果在父类中的某个方 法使用了 synchronized 关键字,而在子类中覆盖了这个方法,在子类中的这 个方法默认情况下并不是同步的,而必须显式地在子类的这个方法中加上 synchronized 关键字才可以。

修改一个静态的方法,其作用的范围是整个静态方法,作用的对象是这个类的所有的对象

修改一个类,其作用的范围是synchronnized后面括号括起来的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package lock;


import java.util.concurrent.locks.ReentrantLock;

class LTicket {
// 票数量
private int number =30;

// 创建可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 买票的方法
public void sale(){
// 上锁
lock.lock();
try {
if (number>0){
System.out.println(Thread.currentThread().getName()+" : 卖出:"+(number--)+" 剩下:"+number);
}

}finally {
// 解锁
lock.unlock();
}
}
}
public class LSaleTicket {

public static void main(String[] args) {
LTicket ticket = new LTicket();

// 创建三个线程
new Thread(()->{

for (int i=0;i<40;i++){
ticket.sale();
}
},"aa").start();

new Thread(()->{

for (int i=0;i<40;i++){
ticket.sale();
}
},"bb").start();
new Thread(()->{

for (int i=0;i<40;i++){
ticket.sale();
}
},"cc").start();

}
}

如果一个代码块被 synchronized 修饰了,当一个线程获取了对应的锁,并执 行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里 获取锁的线程释放锁只会有两种情况: 1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有; 2)线程执行发生异常,此时 JVM 会让线程自动释放锁。 那么如果这个获取锁的线程由于要等待 IO 或者其他原因(比如调用 sleep 方法)被阻塞了,但是又没有释放锁,其他线程便只能干巴巴地等待,试想一 下,这多么影响程序执行效率。 因此就需要有一种机制可以不让等待的线程一直无期限地等待下去(比如只等 待一定的时间或者能够响应中断),通过 Lock 就可以办到。

locks

解决多线程安全问题:

1、synchronized:隐式锁

同步代码块

同步方法

2、同步锁

显示锁,需要通过lock()方法上锁,必须痛过unlock方法进行释放

解决多线程安全问题:

synchronized:

1、同步代码块

2、同步方法

jdk 1.5 后:

同步锁 Lock

注意:是一个显示锁,需要通过lock()方法上锁,必须通过unlock()方法进行释放锁

生产者消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//店员
static class Clerk {

private int product = 0;
//进货
public synchronized void get() {
if (product >= 10) {
System.out.println("产品已满");
} else {
System.out.println(Thread.currentThread().getName() + ":" + product++);
}
}

// 卖货
public void sale() {
if (product <= 0) {
System.out.println("缺货");
} else {
System.out.println(Thread.currentThread().getName() + ":" + (--product));
}
}
}

生产者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//生产者
static class Product implements Runnable {
private Clerk clerk;

public Product(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
//不断的生产商品
for (int i = 0; i < 20; i++) {
clerk.get();
}
}
}

消费者线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//消费者
static class Consumer implements Runnable {
private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk =clerk;
}


@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

如果不添加等待唤醒机制,那么就是当没有货物的时候,还会进行访问

解决方式:

等待唤醒机制

image-20220401000338837

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package Productor;


public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Product product = new Product(clerk);
Consumer cus = new Consumer(clerk);

new Thread(product, "生产A").start();
new Thread(cus, "消费B").start();

}

//店员
static class Clerk {

private int product = 0;
//进货
// 同步锁
public synchronized void get() {
if (product >= 1) {
System.out.println("产品已满");

try {
// 等待唤醒机制
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
System.out.println(Thread.currentThread().getName() + ":" + ++product);
this.notifyAll();

}

// 卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
//t
System.out.println(Thread.currentThread().getName() + ":" + (--product));
this.notifyAll();
}
}

//生产者
static class Product implements Runnable {
private Clerk clerk;

public Product(Clerk clerk) {
this.clerk = clerk;
}

@Override
public void run() {
//不断的生产商品
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
//消费者
static class Consumer implements Runnable {
private Clerk clerk;

public Consumer(Clerk clerk) {
this.clerk =clerk;
}


@Override
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}

}




tools

CountDownLatch:

CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作 之前,它允许一个或多个线程一直等待。

闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活 动直到其他活动都完成才继续执行:

➢ 确保某个计算在其需要的所有资源都被初始化之后才继续执行;

➢ 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;

➢ 等待直到某个操作所有参与者都准备就绪再继续执行

阻塞队列:

含义

阻塞队列,顾名思义,首先它是一个队列, 通过一个共享的队列,可以使得数据 由队列的一端输入,从另外一端输出;

image-20220411151950831

  • 当队列是空的,从队列中获取元素的操作将会被阻塞
  • 队列是满的,从队列中添加元素的操作将会被阻塞
  • 试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
  • 试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多 个元素或者完全清空,使队列变得空闲起来并后续新增

在多线程领域:所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起 的线程又会自动被唤起

常用的队列:

先进先出(FIFO):先插入的队列的元素也最先出队列,类似于排队的功能。 从某种程度上来说这种队列也体现了一种公平性

后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发 生的事件(栈)

为什么要使用阻塞队列:

因为不用关心什么时候进行阻塞进程,交给队列进行实现,给程序带来复杂性。

在多线程的环境下,通过队列可以很方便的实现数据共享,比如经典的“生产者”和消费者的问题,我们可以使用队列便利的实现两者间的数据共享问题。一端生产一端进行消费但如果生产者和消费者在某个时间段内,万一 发生数据处理速度不匹配的情况呢?

理想情况下,如果生产者产出数据的速度 大于消费者消费的速度,并且当生产出来的数据累积到一定程度的时候,那么 生产者必须暂停等待一下(阻塞生产者线程),以便等待消费者线程把累积的 数据处理完毕,反之亦然。

当队列中没有数据时,消费者端的所有的线程被阻塞(挂起),直到有数据放入队列中。

当队列中填满数据的情况下,生产端的所有的线程都会被自动阻塞(挂起),直到队列有空的位置,线程被自动唤醒。

BlockingQueue 的核心API

image-20220411144550371

1、放入数据:

• offer(anObject):表示如果可能的话,将 anObject 加到 BlockingQueue 里,即 如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.(本方法不阻塞当 前执行方法的线程)

• offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定 的时间内,还不能往队列中加入 BlockingQueue,则返回失败

• put(anObject):把 anObject 加到 BlockingQueue 里,如果 BlockQueue 没有 空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.

2.获取数据

• poll(time): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null

• poll(long timeout, TimeUnit unit):从 BlockingQueue 取出一个队首的对象, 如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知 道时间超时还没有数据可取,返回失败。

• take(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断 进入等待状态直到 BlockingQueue 有新的数据被加入;

• drainTo(): 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定 获取数据的个数),通过该方法

,可以提升获取数据效率;不需要多次分批加 锁或释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

// 创建阻塞队列
final BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue(3);
//
// System.out.println(arrayBlockingQueue.add("a"));
// System.out.println(arrayBlockingQueue.add("b"));
// System.out.println(arrayBlockingQueue.add("c"));
////
// System.out.println(arrayBlockingQueue.element());
//// 获取队列的第一个值
// System.out.println(arrayBlockingQueue.element());
//
//// 会报错
//// arrayBlockingQueue.add("d");
//// 当队列中没有元素或则队列中元素满了,就会抛出异常
// System.out.println(arrayBlockingQueue.remove());
// System.out.println(arrayBlockingQueue.remove());
// System.out.println(arrayBlockingQueue.remove());
// System.out.println(arrayBlockingQueue.remove());
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));

System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());
System.out.println(arrayBlockingQueue.poll());

1.add()与remove()如果不足或者满,会抛出异常

image-20220411145529687

2.使用offer()与poll(),如果队列空值或者队列满时,添加值返回false,移除值会返回null

image-20220411145320456

3.当阻塞队列满时,生产者线程继续往队列里put元素,队列会一直阻塞生产者线程直到put数据or响应中断是出
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

1
2
3
4
5
6
7
8
9
10
arrayBlockingQueue.put("a");
arrayBlockingQueue.put("b");
arrayBlockingQueue.put("c");

System.out.println(arrayBlockingQueue.take());

System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());
System.out.println(arrayBlockingQueue.take());

4、offer(“w”,3L, TimeUnit.SECONDS)

可以设定等待的时间,如果在指定 的时间内,还不能往队列中加入 BlockingQueue,则返回失败image-20220411150425506

1
2
3
4
5
System.out.println(arrayBlockingQueue.offer("a"));
System.out.println(arrayBlockingQueue.offer("b"));
System.out.println(arrayBlockingQueue.offer("c"));
System.out.println(arrayBlockingQueue.offer("w",3L, TimeUnit.SECONDS));

并发队列

ArrayBlockingQueue:

数组结构组成的有界阻塞队列,

在 ArrayBlockingQueue 内部,维护了一个定长数 组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数 组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的 头部和尾部在数组中的位置。

ArrayBlockingQueue 在生产者放入数据和消费者获取数据,都是共用同一个 锁对象,由此也意味着两者无法真正并行运行。

ArrayBlockingQueue :可以控制内部锁是否采用公平锁,默认采用非公平锁

LinkedBlockingQueue:写入与获取元素已经足够轻巧,引入独立的锁机制,会产生额外的消耗

LinkedBlockingQueue:生产者端和消费者端分别采用了独立的锁来控制数据同步

LinkedBlockingQueue:

链表结构组成的有界(但大小默认值为 integer.MAX_VALUE)阻塞队列。

当生产者往队列中放入一个数据 时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回; 只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue 可以通过 构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份 数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。 而 LinkedBlockingQueue 之所以能够高效的处理并发数据,还因为其对于生 产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发 的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列 的并发性能。

DelayQueue:

使用优先级队列实现的延迟无界阻塞队列

DelayQueue 中的元素只有当其指定的延迟时间到了,才能够从队列中获取到 该元素。DelayQueue 是一个没有大小限制的队列,因此往队列中插入数据的 操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻 塞。

PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的 Compator 对象来 决定)

支持优先级排序的无界阻塞队列

不会阻塞数据生产者,只会在没有可消费的数据中,阻塞数据消费者

生产者生产数据的速度绝对不能快于消费者消费 数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。

采用控制线程同步采用公平锁

SynchronousQueue

不存储元素的阻塞队列,也即单个元素的队列

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产 者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须 亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么 对不起,大家都在集市等待。相对于有缓冲的 BlockingQueue 来说,少了一 个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经 销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以 库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式 会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得 产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能 可能会降低。

声明一个 SynchronousQueue 有两种不同的方式,它们之间有着不太一样的 行为。

公平模式和非公平模式的区别:

公平模式:SynchronousQueue 会采用公平锁,并配合一个 FIFO 队列来阻塞 多余的生产者和消费者,从而体系整体的公平策略;

非公平模式(SynchronousQueue 默认):SynchronousQueue 采用非公平 锁,同时配合一个 LIFO 队列来管理多余的生产者和消费者,而后一种模式, 如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有 某些生产者或者是消费者的数据永远都得不到处理。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队 列。

相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素时,如 果队列不为空,则直接取走数据,若队列为空,那就生成一个节点(节点元素 为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队时 发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的 方法返回。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列,即可以从队 列的两端插入和移除元素。

对于一些指定的操作,在插入或者获取队列元素时如果队列状态不允许该操作 可能会阻塞住该线程直到队列状态变更为允许操作,这里的阻塞一般有两种情 况

• 插入元素时: 如果当前队列已满将会进入阻塞状态,一直等到队列有空的位置时 再讲该元素插入,该操作可以通过设置超时参数,超时后返回 false 表示操作 失败,也可以不设置超时参数一直阻塞,中断后抛出 InterruptedException 异 常

• 读取元素时: 如果当前队列为空会阻塞住直到队列不为空然后返回元素,同样可 以通过设置超时参数

线程池:

线程池:一种线程使用模式。线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理 者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代 价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

线程池的优势:

线程池做的工作只是控制运行的线程的数量,处理过程中将任务放入队列中,线程过多会带来调度开销, 进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理 者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代 价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

它的主要特点为:

降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。

• 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。

• 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资 源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

• Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors, ExecutorService,ThreadPoolExecutor

线程池参数:

• corePoolSize 线程池的核心线程数

• maximumPoolSize 能容纳的最大线程数

• keepAliveTime 空闲线程存活时间

• unit 存活的时间单位

• workQueue 存放提交但未执行任务的队列

• threadFactory 创建线程的工厂类

• handler 等待队列满后的拒绝策略

线程池中,有三个重要的参数,决定影响了拒绝策略:

corePoolSize - 核心线 程数,也即最小的线程数。

workQueue - 阻塞队列 。

maximumPoolSize - 最大线程数 当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻 塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池 的拒绝策略了。 总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

拒绝策略(重点)

CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由 于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效 率上必然的损失较大

AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常 信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。

DiscardPolicy: 直接丢弃,其他啥都没有

DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞 队列 workQueue 中最老的一个任务,并将新任务加入

线程池的种类与创建

newFixedThreadPool(常用)

作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中 等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。

特征:

• 线程池中的线程处于一定的量,可以很好的控制线程的并发量

• 线程可以重复被使用,在显示关闭之前,都将一直存在

• 超出一定量的线程被提交时候需在队列中等待

image-20220414144445132

newCachedThreadPool(常用)

作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空 闲线程,若无可回收,则新建线程.

特点:

• 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)

• 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)

• 当线程池中,没有可用线程,会重新创建一个线程

场景:适用于创建一个无限扩大的线程池,服务器负载压力较轻,执行时间短,任务多的场景

image-20220414145258875

newScheduleThreadPool(了解)

作用: 线程池支持定时以及周期性执行任务,创建一个 corePoolSize 为传入参数,最大线程数为整形的最大数的线程池

特征:

(1)线程池中具有指定数量的线程,即便是空线程也将保留

(2)可定时或者 延迟执行线程活动

创建方式:

1
2
3
4
5
6
public static ScheduledExecutorService newScheduledThreadPool(int
corePoolSize,
ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize,
threadFactory);
}

场景: 适用于需要多个后台线程执行周期任务的场景

newWorkStealingPool

jdk1.8 提供的线程池,底层使用的是 ForkJoinPool 实现,创建一个拥有多个 任务队列的线程池,可以减少连接数,创建当前可用 cpu 核数的线程来并行执 行任务

创建方式:

1
2
3
4
5
6
7
8
9
10
11
12
public static ExecutorService newWorkStealingPool(int parallelism) {
/**
* parallelism:并行级别,通常默认为 JVM 可用的处理器个数
* factory:用于创建 ForkJoinPool 中使用的线程。
* handler:用于处理工作线程未处理的异常,默认为 null
* asyncMode:用于控制 WorkQueue 的工作模式:队列---反队列
*/
return new ForkJoinPool(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true);
}

场景: 适用于大耗时,可并行执行的场景

线程池底层工作原理(重要)

  1. 在创建了线程池后,线程池中

image-20220414150246609

1.在创建线程池后,线程池中的线程数为零

2.当调用execute()方法添加一个请求任务时线程池会做如下判断:

2.1 如果线程池数量小于或则等于corePoolSize,那么马上创建线程运行这个任务;

2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;

2.3 如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

  1. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。

    4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小

image-20220414150655550

  1. 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都 有一定问题,原因是 FixedThreadPool 和 SingleThreadExecutor 底层都是用 LinkedBlockingQueue 实现的,这个队列最大长度为 Integer.MAX_VALUE, 容易导致 OOM。所以实际生产一般自己通过 ThreadPoolExecutor 的 7 个参 数,自定义线程池

  2. 创建线程池推荐适用 ThreadPoolExecutor 及其 7 个参数手动创建 o corePoolSize 线程池的核心线程数 o maximumPoolSize 能容纳的最大线程数 o keepAliveTime 空闲线程存活时间 o unit 存活的时间单位 o workQueue 存放提交但未执行任务的队列 o threadFactory 创建线程的工厂类 o handler 等待队列满后的拒绝策略

  3. 为什么不允许适用不允许 Executors.的方式手动创建线程池,如下图

image-20220414150809843