什么是JUC

java.util.concurrent工具包的简称,处理线程的一个工具包

进程

资源分配的最小单位,在系统中运行的一个应用程序

线程

程序执行的最小单位,一个进程包含多个线程

线程状态

image-20210901110727

新建、就绪、阻塞、等待1、等待2、终结

waiting(不见不散—不来就一直等)

timed_waiting(过时不候)

wait和sleep

1、sleep是Thread的静态方法,wait是object方法,任何对象都能调用

2、sleep不会释放锁,也不占用锁。wait会释放锁,前提是先占用锁(代码在synchronized中)

3、都可以被interrupted方法中断

并发和并行

并发:有同时处理多个任务的能力,不一定要同时

并行:同时执行任务的能力

区别在于是否可以同时

管程

只是保证了同一时刻只有一个进程在管程内活动,即管程内定义的操作在同一时刻只被一个进程调用(由编译器实现).但是这样并不能保证进程以设计的顺序执行,因此需要设置condition变量,让进入管程而无法继续执行的进程阻塞自己.

synchronized

用来解决多线程访问资源的同步性,可以保证它修饰的方法或代码块在任意时刻只能有一个线程执行。

在1.6之前是重量级锁,之后进行了锁升级,减少锁操作带来的开销

多线程编程步骤

1、创建资源类

2、在资源类中创建属性和操作方法

3、创建多个线程,调用资源类的操作方法

4、防止虚假唤醒问题

LOCK接口

可重入锁,比synchronized拥有更广泛的操作

区别

1、lock是一个接口,synchronized是关键字,内置语言实现

2、synchronized发生异常会自动释放占有的锁,因此不会导致死锁,lock如果没有unlock释放锁,很可能死锁,所以最好放到finally中

3、lock可以响应中断,synchronized不行,会一直等待

4、lock可以知道有没有获取锁,synchronized不行

5、lock可以提高多个线程读操作的效率

总结

竞争资源激烈的情况下,推荐使用lock

volatile

1、保证可见性,防止主存与缓存数据的不一致。指示jvm,到主存中读取数据
2、防止JVM指令重排,保证多线程环境下也能正常运行

分布式锁

数据库、redis、zk

特点:同一时间,只有一个客户端可以获取锁,其他客户端等待

ThreadLocal

让每个线程都有自己的本地变量,从而避免线程安全问题
示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadLocalTest {
private List<String> messages = Lists.newArrayList();

public static final ThreadLocal<ThreadLocalTest> holder = ThreadLocal.withInitial(ThreadLocalTest::new);

public static void add(String message) {
holder.get().messages.add(message);
}

public static List<String> clear() {
List<String> messages = holder.get().messages;
holder.remove();

System.out.println("size: " + holder.get().messages.size());
return messages;
}

public static void main(String[] args) {
ThreadLocalTest.add("一枝花算不算浪漫");
System.out.println(holder.get().messages);
ThreadLocalTest.clear();
}
}

每个线程在往ThreadLocal里放值的时候,都会往自己的ThreadLocalMap里存,读也是以ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。其中的value是一个弱引用

  • 强引用:new出来的对象。宁愿抛OOM,也不会被回收
  • 软引用:被softreference修饰,在内存将要溢出的时候进行回收
  • 弱引用:被weekreference修饰,只要发生垃圾回收,那么就会被回收
  • 虚引用:被phantomreference修饰,用队列接收对象即将死亡的通知

实现场景

spring,threadLocal的存储类型是一个map,key是datasource,value是connection(为了应对多数据源),用threadLocal保证统一线程获取的是一个connection对象,从而保证一次事务所有操作需要在统一数据库连接上

ThreadLocal的内存泄露

ThreadLocal是一个外壳,真正的存储结构是其中的Map,但这个map是在Thread上定义的,ThreadLocal本身不提供值,作为key让线程从map中获取value。内存泄漏指ThreadLocal回收了,map entry的key没有了指向,但引用一直存在,导致了内存泄漏。解决的最佳方式为用完之后手动remove掉

为什么将map中的key设置为弱引用

外界通过map操作,假设将ThreadLocal设置为null,那其中的map强引用指向ThreadLocal也没有意义了。弱引用可以预防大多数内存泄漏的情况

为什么能实现线程隔离

在于其中的map,只初始化一次,分配一块内存地址

什么是内存泄漏

申请完内存,用完没有及时的进行释放,自己又没法在用,系统也无法进行回收

线程间的通信

让线程按照既定的步骤去执行

可能会出现的问题:虚假唤醒,就是自己本来是wait状态,然后别人又进行了唤醒,那么他就会执行下面的逻辑,所以状态需要一直进行判断,所以判断状态需要放到while中

线程间的定制化通信

线程按照顺序执行A执行完通知B执行完通知C,以此类推,执行10轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class DIY {
int flag = 1;
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();

void print3(int loop) throws Exception{
lock.lock();
try {
while (flag != 1) {
condition1.await();
}
for (int i = 0; i < 3; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 2;
//通知给B
condition2.signal();
} finally {
lock.unlock();
}
}

void print6(int loop) throws Exception{
lock.lock();
try {
while (flag != 2) {
condition2.await();
}
for (int i = 0; i < 6; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 3;
//通知给B
condition3.signal();
} finally {
lock.unlock();
}
}

void print9(int loop) throws Exception{
lock.lock();
try {
while (flag != 3) {
condition3.await();
}
for (int i = 0; i < 9; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 1;
//通知给B
condition1.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
DIY diy = new DIY();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print3(i);
}
} catch (Exception e) {

}
},"a").start();

new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print6(i);
}
} catch (Exception e) {

}
},"b").start();

new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print9(i);
}
} catch (Exception e) {

}
},"c").start();
}
}

线程安全

线程安全的核心理念:要么只读,要么加锁

解决线程安全是进阶的重要能力

集合

CopyOnWriteArrayList代替ArrayList

  • 读操作没有锁,因为是数组直接替换,不需要修改,没有安全问题
  • 写操作加了ReentrantLock,保证同步,避免多线程创建多个副本的问题

CopyOnWriteArraySet代替ArraySet

ConcurrentHashMap代替HashMap

跳表

对于单链表,查询需要从头查到尾,效率低下。调表的意义在于可以跳着查,提高效率,与平衡树不同的是,平衡树添加和删除需要对全局进行调整,而跳表是局部的。这直接导致锁范围的不同,在高并发场景下,会有更好的性能

本质是维护多个链表,没上面一层都是下面的子集,如图找18的元素,可以进行跳跃式查找,效率得到明显的提升
20210918114140

可以得出,跳表是一种利用空间换时间的算法。哈希不保证有序性,如果需要有序性,可以考虑跳表的实现实现ConcurrentSkipListMap

AQS组件

wait() —> CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号线程执行了");
//每执行一个线程计数器-1
countDownLatch.countDown();
},i+"").start();
}
//如果没有变为0则等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"最后执行");
}
}

notify() —> Senaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoreDemo {
public static void main(String[] args) {
//信号量设置为3,表示线程数不能超过此值,超过会等待
Semaphore semaphore = new Semaphore(3);

//创建6个线程
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//抢占资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
//留有时间抢占资源
Thread.sleep(new Random().nextInt(3));
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放,别的线程唤醒继续抢车位
semaphore.release();
}
},i+"").start();
}
}
}

多线程锁的8种情况

定义两个方法,都加synchronized,创建两个线程,先后调用方法1,2,线程中间睡1s

①标准调用,1—2

②在第一个方法中睡4s,整体会先睡4s,然后正常调用

③新建普通方法3替换2,不加synchronized,调用顺序3—1

④两个对象调用,先2后1,锁的是一个对象

⑤方法前加static,跟②相同

⑥方法前加static,用两个对象调用,跟②相同,类对象是一个

⑦方法1静态,2普通,先调用普通,在调用静态

⑧⑦的条件,两个对象调用,结果同⑦

总结

对于普通同步方法,锁的是当前实例对象

对于静态同步方法,锁的是类对象

同步方法块,锁的是括号中配置的对象

各种锁

乐观锁与悲观锁

悲观锁:对任何操作都是悲观的,认为它访问的时候别人也会访问,所以在操作前会进行上锁,操作后解锁,安全,但效率低,只能一个一个进行操作
image-20210901111923904

乐观锁,对任何操作都是乐观的,认为访问的时候别人不会访问,每次操作会给定一个版本号,如果版本号不同则不允许操作,适合读多写少
image-20210901112004815

独占锁共享锁

独占锁:只能被一个线程享有。synchronized和lock实现类就是独占锁,并发不行,性能较低
image-20210901112224415

共享锁:可以被多个线程享有。如果一个线程加了共享锁,那么就不能加独占锁。ReentrantReadWriteLock就是共享锁
image-20210901112357119

独占锁和共享锁的体现

互斥锁 :独占锁的体现,具有唯一性和排他性。
image-20210901112740335
读写锁:共享锁的体现。读写锁管理一组锁,一个是只读的锁,一个是写锁。
读锁:共享锁,会发生死锁,可以多个一起读

写锁:独占锁,会发生死锁,只能一个进行写
ReentrantReadWriteLock是读写锁,读读可以共享,提高性能,存在的问题,存在锁饥饿的问题,一直读的时候,程序没法写。读不可以写,写可以读。
image-20210901112911028

读写锁的降级

写入锁降低为读锁,目的提高数据的可见性

jdk8说明

1、获取写锁

2、获取读锁

3、释放写锁

4、释放读锁

此时读锁不能升级为写锁

表锁和行锁

表锁:对整张表上锁,对某条记录修改,整个表都不能访问,必须等其释放锁,不会有死锁

行锁:操作哪条记录就对哪条记录上锁,别人还可以访问别的记录,会有死锁

公平锁和非公平锁

公平锁:雨露均沾,每个线程都会分到,但不是绝对平均

非公平锁:没有顺序,不一定每个线程都会分到,分配不均
默认new出来的是非公平锁,非公平锁可能会导致线程饿死的情况,活都给别的线程分配了,分配不均,但执行效率高。而公平锁会做到雨露均沾,但不保证绝对平均,效率相对会低一点
image-20210901113319936

可重入锁

锁里面还能调锁,又叫递归锁。synchronized是隐式的,lock是显示的,都是可重入锁,就是可以进行递归调用
image-20210901113357915

1
2
3
4
5
6
7
8
public synchronized void mehtodA() throws Exception{
// Do some magic tings
mehtodB();
}

public synchronized void mehtodB() throws Exception{
// Do some magic tings
}

自旋锁

线程在没有锁的时候不会被挂起,而是处于一个忙循环状态,类似等待的状态,这个又叫自旋
image-20210901113736015

目的:减少被挂起的几率,线程的唤醒和挂起很耗费资源,但一致自旋也是耗性能,所以并不适合于时间长的并发情况
如下代码,CAS 操作如果失败就会一直循环获取当前 value 值然后重试

1
2
3
4
5
6
7
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}

在JDK1.6又引入了自适应自旋,这个就比较智能了,自旋时间不再固定,由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定。如果虚拟机认为这次自旋也很有可能再次成功那就会次序较多的时间,如果自旋很少成功,那以后可能就直接省略掉自旋过程,避免浪费处理器资源。

死锁

互相等待对方去释放锁,没有外力干涉,就无法执行下去。A需要B持有的资源,但B不释放,B需要A持有的资源,但A不释放,都在等对方释放

产生原因

1、系统资源不足

2、进程推进不合适

3、资源分配不当

如何查看

jstack,jvm自带工具,jstack 进程号,查看是否死锁

Java案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class DeadLock {

//创建两个对象
static Object o1 = new Object();
static Object o2 = new Object();
public static void main(String[] args) {
new Thread(()->{
System.out.println(" LockA 开始执行");
synchronized (o1) {
System.out.println("持有锁A,试图获取B");
synchronized (o2) {
System.out.println("获取到B");
}
}
}).start();
new Thread(()->{
System.out.println(" LockB 开始执行");
synchronized (o2) {
System.out.println("持有锁B,试图获取A");
synchronized (o1) {
System.out.println("获取到A");
}
}
}).start();
}
}

结果

1
2
3
4
LockA 开始执行
LockB 开始执行
持有锁A,试图获取B
持有锁B,试图获取A

锁升级

提升锁性能,减少获取和释放锁带来的消耗,提供4种状态无锁、偏向锁、轻量级锁和重量级锁
无锁:乐观锁

偏向锁:指它会偏向于第一个访问锁的线程,如果在运行过程中,只有一个线程访问加锁的资源,不存在多线程竞争的情况,那么线程是不需要重复获取锁的,这种情况下,就会给线程加一个偏向锁。第一次访问时id为空,jvm为其线程设置id

轻量级锁:当线程竞争变得比较激烈时,偏向锁就会升级为轻量级锁,轻量级锁认为虽然竞争是存在的,但是理想情况下竞争的程度很低,通过自旋方式等待上一个线程释放锁。判断id是否一致若一致使用该对象,不一致升级为轻量级锁

重量级锁:也是互斥锁。如果线程并发进一步加剧,线程的自旋超过了一定次数,或者一个线程持有锁,一个线程在自旋,又来了第三个线程访问时(反正就是竞争继续加大了),轻量级锁就会膨胀为重量级锁,重量级锁会使除了此时拥有锁的线程以外的线程都阻塞。

锁优化

锁粗化:能不上锁就不上,范围能小就小

锁消除:指虚拟机编译器在运行时检测到了共享数据没有竞争的锁,从而将这些锁进行消除。
比如这个for循环

1
2
3
4
5
6
for (int i = 0; i < 1000; i++) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("dali");
stringBuffer.append("技术分享");
stringBuffer.toString();
}

已知StringBuffer是一个线程安全类,但stringBuffer变量属于局部变量,存于栈中,栈为线程私有本来就是线程安全,而append方法是一个同步方法,只会白白浪费系统资源,为了提升效率,虚拟机帮我们消除了这些同步锁,这个过程就被称为锁消除

1
2
3
4
5
6
@Override
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}

callable接口

创建线程的方式

1、继承Thread类

2、实现Runnable接口

3、callable接口(有返回值的Runnable)

4、线程池

传统创建线程的方式,无法获取线程返回结果,为了支持该功能,增加了callable接口

与Runnable的区别

1、是否有返回值,callbable有

2、是否抛出异常,callable抛

3、实现方法名不同,callable实现call(),另一个是run()

如何利用callable创建线程

Thread没有提供callable的参数,那么就需要找到一个类,能够建立起runnable与callable的关系

Runnable有一个实现类,FutureTask,可以传递callable接口

FutureTask:在不影响主线程的情况下,单开一个线程,做其他事情,最后在进行汇总

创建一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(MyThread::new);
new Thread(futureTask,"aa").start();
while (!futureTask.isDone()) {
//不完成,输出wait
System.out.println("wait...");
}
//调用get方法
System.out.println(futureTask.get());
//调用第二次的时候直接返回
System.out.println(futureTask.get());
}
}
class MyThread implements Callable{

@Override
public Object call() throws Exception {
return null;
}
}

JUC辅助类

1、减少计数

CountDownLatch

基于AQS实现,在构建对象的时候传入的值会赋给AQS的关键变量state,-1操作是利用CAS将state-1,执行await方法,判断state是否为0,不为0则加入队列中,将该线程进行阻塞(头节点除外,会自旋等待,当state为0时,将剩余的在队列中阻塞的节点一并唤醒)。

await实际上让头节点一直在等待state为0时,释放等待的线程

设置一个计数器类,countDown()方法可以让初始值-1,允许一个线程或多个线程等待,直到这些线程完成他们的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号线程执行了");
//每执行一个线程计数器-1
countDownLatch.countDown();
},i+"").start();
}
//如果没有变为0则等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"最后执行");
}
}

2、循环栅栏

CyclicBarrier,当线程到达某状态后,暂停下来等待其他的线程,所有线程都到达之后才会继续执行,阻塞的是任务线程,主线程是不受影响的

线程到达等待数执行条件,相较于CountDownLatch来说它是可复用的。它没有使用AQS的state变量,借助ReentrantLock和condition等待唤醒,在构建该对象时,传入的值会赋给内部维护的count变量,也会赋值给parties变量(复用的关键),调用await会使count-1,利用ReentrantLock保护线程的安全性,如果count不为0,则添加进condition队列中,当count=0,将队列中的线程全部唤醒,并将parties的值重新赋值为count实现复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CyclicBarrierDemo {
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("线程等待数已达条件");
});
for (int i = 0; i < 7; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"线程创建完毕");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},i+"").start();
}
}
}

3、信号量机制

Semaphore,信号量机制,到达某个临界值会处于等待状态,直到线程被释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoreDemo {
public static void main(String[] args) {
//信号量设置为3,表示线程数不能超过此值,超过会等待
Semaphore semaphore = new Semaphore(3);

//创建6个线程
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//抢占资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
//留有时间抢占资源
Thread.sleep(new Random().nextInt(3));
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放,别的线程唤醒继续抢车位
semaphore.release();
}
},i+"").start();
}
}
}

阻塞队列

image-20210818170912835

当队列元素满之后就不能放了,就阻塞。如果元素为空那么就不能取了,就阻塞

为什么使用:不需要关系什么时候阻塞线程,什么时候唤醒线程。阻塞队列已经实现了

分类及方法

BlockingDeque与BlockingQueue

ArrayBlockingQueue(常用)数组的队列

基于数组实现,内部维护了一个定长的数组,以便缓存队列中的数据对象,是一个常用的阻塞队列

LinkedBlockingQueue(常用)链表的队列

基于链表实现,大小默认值为int的最大值,是一个常用的阻塞队列

DealyQueue

使用优先级队列实现延迟无界阻塞队列,没有大小限制

PriorityBlockingQueue

支持优先级排序的无界阻塞队列

SynchronousQueue

单个元素的队列

LinkedTransferQueue

由链表组成的无界阻塞队列

LinkedBlockingDeque

由链表组成的双向阻塞队列

操作方法

image-20210818172323453

线程池

维护者多个线程,等待管理和分配,可以保证线程的充分利用,还能防止过分调度。控制可运行的线程数量,将任务放在队列中,线程不够用了则任务等待。

为什么需要线程池(主要特点)

降低资源消耗:重复利用一定的线程,频繁创建线程会消耗资源,还会降低系统稳定性

提高响应速度:当任务到达时,不需要等待创建线程的时间

便于管理

项目中用到了么

用到了,我负责的项目模块中有一个年度价值统计,需要创建统计模板定时发送信息。每个模板对应一个id,需要在hdfs中去找这个文件,遍历查找操作使用了线程池,因为hdfs遍历操作是一个费时任务,将此过程进行异步化,提高系统的吞吐量,使用ThreadPoolExcutor创建线程池,而不是使用executor(阿里手册不推荐,使用原生方法更能理解线程池运行规则,避免资源被耗尽的风险)

相关架构

在Java中使用Executor框架实现

怎么用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//创建线程池,有固定的线程数
ExecutorService executorService = Executors.newFixedThreadPool(3);
//创建一个线程
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
//可扩容线程
ExecutorService executorService2 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 100; i++) {
executorService2.execute(()->{
System.out.println(Thread.currentThread().getName()+"处理业务");
});
}
} catch (Exception e) {

} finally {
executorService2.shutdown();
}

这三种创建线程池的方法底层都是new的ThreadPoolExecutor

ThreadPoolExecutor的七个参数介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize:核心/常驻线程数

maximumPoolSize:最大线程数

keepAliveTime:活跃时间/存活时间

unit:时间单位

workQueue:阻塞队列

threadFactory:线程工厂,用于创建线程

handler:拒绝策略,没有能力处理

工作流程

image-20210818180336914

执行execute方法的时候才会创建线程,核心线程满了之后,其余的线程会创建到阻塞队列中,阻塞队列满之后,会在最大线程数创建线程,如果都满了,则会执行拒绝策略。最大线程池中的线程会优于阻塞队列中的线程执行,俗称插队

image-20210915161942

拒绝策略

1、AbortPolicy(默认):直接抛异常

2、CallerRunsPolicy:调用者模式,将任务回退到调用者

3、DiscardOldestPolicy:抛弃队列中等待最久的业务,把当前业务加入队列中

4、DiscardPolicy:不处理也不拒绝

在实际开发中,以上三种创建线程池的方式都不用,自定义线程池去实现

Java开发手册中说道

image-20210818182326249

1
2
3
4
5
6
7
8
9
10
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);

AQS

AbstractQueuedSynchronizer,在类java.util.concurrent.locks包下面,它是一个框架,可以构造出大量的同步器比如ReentrantLock、Semaphore

原理

如果被请求的共享资源空闲,则当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。如果被请求资源被占用,则需要一套唤醒机制,AQS利用CLH实现,如果获取不到资源的线程则进入队列中

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请
求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

两种资源共享方式

独占(Exclusive):只有一个线程可以执行如ReentrantLock

共享(Share):多个线程可以同时执行

Fork/Join分支合并框架

将大任务拆分为多个子任务去执行,合并最后的计算结果,并输出

分割fork,合并join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyTask extends RecursiveTask<Integer> {
//执行规则,拆分差值不超过10
private static final int VALUE = 10;
//开始值
private int begin;
//结束值
private int end;
//处理结果
private int result;
//创建有参构造
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
//写逻辑,拆分与合并
@Override
protected Integer compute() {
//判断相加值
if ((end - begin) <= VALUE) {
//相加
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
//进一步拆分,获取到中间值
int mid = (begin + end)/2;
MyTask left = new MyTask(begin, mid);
MyTask right = new MyTask(mid+1, end);
//拆分
left.fork();
right.fork();
//合并
result = left.join() + right.join();
}
return result;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建拆分对象
MyTask myTask = new MyTask(0, 100);
//创建合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//获取合并的结果
Integer result = forkJoinTask.get();
System.out.println(result);
//关闭池对象
forkJoinPool.shutdown();
}
}

输出

1
5050

异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//有返回值
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
int i = 1/0;
return 1;
});
//当xxx完成之后调用,t是返回值,u是异常
completableFuture1.whenComplete((t,u)->{
System.out.println(t+":"+u);
});
completableFuture1.get();
//无返回值
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName());
});
completableFuture2.get();

线程池的大小确定

线程过多会导致增加上下文切换成本

多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。

但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

atomic

原子类,位于java.util.concurrent.atomic下,总共4类。基本类型,数组类型,引用类型,对象的属性修改类型