什么是JUC

java.util.concurrent工具包的简称,处理线程的一个工具包

进程

资源分配的最小单位,在系统中运行的一个应用程序,每个进程都有自己的内存空间和系统资源

线程

程序执行的最小单位,一个进程包含多个线程。程序调度的基本单元。
线程分为用户线程(默认线程)和守护线程。用户线程会完成程序的业务操作
守护线程做一些默默的事情比如GC。用户线程全部结束则守护线程结束

管程

monitor监视器,也就是锁(synchronized),方法调用如果有synchronized,执行线程前首先要成功持有管程。在方法执行期间,只有管程的线程,其他任何线程都无法获取到同一个管程。

线程状态

image-20210901110727

新建、就绪、阻塞、等待1、等待2、终结

waiting(不见不散—不来就一直等)

timed_waiting(过时不候)

wait和sleep

1、sleep是Thread的静态方法,wait是object方法,任何对象都能调用

2、sleep不会释放锁,也不占用锁。wait会释放锁,前提是先占用锁(代码在synchronized中)

3、都可以被interrupted方法中断

并发和并行

并发:有同时处理多个任务的能力,不一定要同时。一个处理器对多个任务。eg。秒杀,抢票

并行:同时执行任务的能力。多个处理器对多个任务。

区别在于是否可以同时

就好比并发是一个窗口很多人买票,并行是多个窗口很多人买票

synchronized

用来解决多线程访问资源的同步性,可以保证它修饰的方法或代码块在任意时刻只能有一个线程执行。

在1.6之前是重量级锁,之后进行了锁升级,减少锁操作带来的开销

多线程编程步骤

1、创建资源类

2、在资源类中创建属性和操作方法

3、创建多个线程,调用资源类的操作方法

4、防止虚假唤醒问题

LOCK接口

可重入锁,比synchronized拥有更广泛的操作

synchronized与lock区别

  1. 接口关键字。lock是一个接口,synchronized是关键字,内置语言实现
  2. 死锁问题。synchronized发生异常会自动释放占有的锁,因此不会导致死锁
    lock如果没有unlock释放锁,很可能死锁,所以最好放到finally中
  3. lock可以响应中断,synchronized不行,会一直等待
  4. 是否获取锁。lock可以知道有没有获取锁,synchronized不行
  5. lock可以提高多个线程读操作的效率

总结

竞争资源激烈的情况下,推荐使用lock

volatile

  1. 保证可见性,防止主存与缓存数据的不一致。指示jvm,到主存中读取数据
  2. 防止JVM指令重排,保证多线程环境下也能正常运行
  3. 不支持原子性
  4. 写的时候直接刷新到主内存中,读的时候直接从主内存中读取,复制到工作内存

应用场景

  • 单一赋值,不包含i++之类的复合赋值
  • 读多于写,结合内部锁减少内存开销
  • 单例模式 — 双重检查(先判空再加锁在判空)

volatile为什么能保证可见和重排序

内存屏障。其实就是JVM的一种指令,JMM的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些指令,实现了可见性和有序性
写屏障:告诉CPU在看到store指令,就必须把指令之前的所有数据都同步到主内存中
读屏障:在看到Load指令后,让工作内存和cpu高速缓存中的数据失效,重新回到主存中获取数据
细分四种屏障类型(了解)
image-20221123154502368

为什么volatile不保证原子性

volatile的可见性保证线程每次在读的时候数据是最新的。但多线程环境下,计算赋值的操作屡次出现,若在计算期间,被volatile修饰的值修改完毕(其他线程),则当前线程的计算作废,所以操作可能会出现丢失问题,进而导致数据不一致

分布式锁

数据库、redis、zk

特点:同一时间,只有一个客户端可以获取锁,其他客户端等待

ThreadLocal

让每个线程都有自己的本地变量,从而避免线程安全问题
(如何才能不争抢,1.用锁控制(synchronized或lock),2.人手一份(Threadlocal))

  • 不解决线程间的共享数据问题
  • 避免加锁,map仅自己线程访问

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ThreadLocalTest {
private List<String> messages = Lists.newArrayList();

public static final ThreadLocal<ThreadLocalTest> holder = ThreadLocal.withInitial(ThreadLocalTest::new);

public static void add(String message) {
holder.get().messages.add(message);
}

public static List<String> clear() {
List<String> messages = holder.get().messages;
holder.remove();

System.out.println("size: " + holder.get().messages.size());
return messages;
}

public static void main(String[] args) {
ThreadLocalTest.add("一枝花算不算浪漫");
System.out.println(holder.get().messages);
ThreadLocalTest.clear();
}
}

每个线程在往ThreadLocal里放值的时候,都会往自己的ThreadLocalMap里存,读也是以ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。其中的value是一个弱引用
key是当前ThreadLocal,value是当前值

  • 强引用:new出来的对象。宁愿抛OOM,也不会被回收
  • 软引用:被softreference修饰,在内存将要溢出的时候进行回收
  • 弱引用:被weekreference修饰,只要发生垃圾回收,那么就会被回收
  • 虚引用:被phantomreference修饰,用队列接收对象即将死亡的通知

实现场景

数据库。spring,threadLocal的存储类型是一个map,key是datasource,value是connection(为了应对多数据源),用threadLocal保证统一线程获取的是一个connection对象,从而保证一次事务所有操作需要在统一数据库连接上

ThreadLocal的内存泄露

详细文章:https://www.jianshu.com/p/dde92ec37bd1

ThreadLocal

ThreadLocal中包含了ThreadLocalMap,然而ThreadLocalMap的对象是在Thread中的,如果Thread没有结束,则ThreadLocalMap一直不会释放,假如ThreadLocalMap中设置了很多值,而且没有手动设置remove(),则可能会造成内存泄露。

为什么将map中的key设置为弱引用

假设如果存储的强引用,无法通过threadLocal==null来对threadLocal进行回收。
在业务代码中执行threadLocal==null操作,由于threadLocalMap的Entry强引用threadLocal,因此在gc的时候进行可达性分析,threadLocal依然可达,对threadLocal并不会进行垃圾回收,这样就无法通过threadLocal==null来对threadLocal进行回收,出现逻辑错误

如果为弱引用,在threadLocal生命周期里会尽可能的保证不出现内存泄漏的问题,达到安全的状态。
在threadLocal的生命周期里(set,getEntry,remove)里,都会针对key为null的脏entry进行处理。但还可能出现内存泄漏,所以还需要手动remove

为什么能实现线程隔离

在于其中的map,只初始化一次,分配一块内存地址

什么是内存泄漏

申请完内存,用完没有及时的进行释放,自己又没法在用,系统也无法进行回收

线程间的通信

让线程按照既定的步骤去执行

可能会出现的问题:虚假唤醒,就是自己本来是wait状态,然后别人又进行了唤醒,那么他就会执行下面的逻辑,所以状态需要一直进行判断,所以判断状态需要放到while中

线程间的定制化通信

线程按照顺序执行A执行完通知B执行完通知C,以此类推,执行10轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class DIY {
int flag = 1;
Lock lock = new ReentrantLock();
Condition condition1 = lock.newCondition();
Condition condition2 = lock.newCondition();
Condition condition3 = lock.newCondition();

void print3(int loop) throws Exception{
lock.lock();
try {
while (flag != 1) {
condition1.await();
}
for (int i = 0; i < 3; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 2;
//通知给B
condition2.signal();
} finally {
lock.unlock();
}
}

void print6(int loop) throws Exception{
lock.lock();
try {
while (flag != 2) {
condition2.await();
}
for (int i = 0; i < 6; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 3;
//通知给B
condition3.signal();
} finally {
lock.unlock();
}
}

void print9(int loop) throws Exception{
lock.lock();
try {
while (flag != 3) {
condition3.await();
}
for (int i = 0; i < 9; i++)
System.out.println(Thread.currentThread().getName()+i+"轮数:"+loop);
flag = 1;
//通知给B
condition1.signal();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
DIY diy = new DIY();
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print3(i);
}
} catch (Exception e) {

}
},"a").start();

new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print6(i);
}
} catch (Exception e) {

}
},"b").start();

new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
diy.print9(i);
}
} catch (Exception e) {

}
},"c").start();
}
}

线程安全

线程安全的核心理念:要么只读,要么加锁

解决线程安全是进阶的重要能力

集合

CopyOnWriteArrayList代替ArrayList

  • 读操作没有锁,因为是数组直接替换,不需要修改,没有安全问题
  • 写操作加了ReentrantLock,保证同步,避免多线程创建多个副本的问题

CopyOnWriteArraySet代替ArraySet

ConcurrentHashMap代替HashMap

跳表

对于单链表,查询需要从头查到尾,效率低下。调表的意义在于可以跳着查,提高效率,与平衡树不同的是,平衡树添加和删除需要对全局进行调整,而跳表是局部的。这直接导致锁范围的不同,在高并发场景下,会有更好的性能

本质是维护多个链表,没上面一层都是下面的子集,如图找18的元素,可以进行跳跃式查找,效率得到明显的提升
20210918114140

可以得出,跳表是一种利用空间换时间的算法。哈希不保证有序性,如果需要有序性,可以考虑跳表的实现实现ConcurrentSkipListMap

AQS组件

wait() —> CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号线程执行了");
//每执行一个线程计数器-1
countDownLatch.countDown();
},i+"").start();
}
//如果没有变为0则等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"最后执行");
}
}

notify() —> Senaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoreDemo {
public static void main(String[] args) {
//信号量设置为3,表示线程数不能超过此值,超过会等待
Semaphore semaphore = new Semaphore(3);

//创建6个线程
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//抢占资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
//留有时间抢占资源
Thread.sleep(new Random().nextInt(3));
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放,别的线程唤醒继续抢车位
semaphore.release();
}
},i+"").start();
}
}
}

多线程锁的8种情况

定义两个方法,都加synchronized,创建两个线程,先后调用方法1,2,线程中间睡1s

①标准调用,1—2

②在第一个方法中睡4s,整体会先睡4s,然后正常调用

③新建普通方法3替换2,不加synchronized,调用顺序3—1

④两个对象调用,先2后1,锁的是一个对象

⑤方法前加static,跟②相同

⑥方法前加static,用两个对象调用,跟②相同,类对象是一个

⑦方法1静态,2普通,先调用普通,在调用静态

⑧⑦的条件,两个对象调用,结果同⑦

总结

对于普通同步方法,锁的是当前实例对象

对于静态同步方法,锁的是类对象

同步方法块,锁的是括号中配置的对象

各种锁

乐观锁与悲观锁

悲观锁:对任何操作都是悲观的,认为它访问的时候别人也会访问,所以在操作前会进行上锁,操作后解锁,安全,但效率低,只能一个一个进行操作。比如synchronized和lock都是悲观锁
image-20210901111923904

乐观锁,对任何操作都是乐观的,认为访问的时候别人不会访问
每次操作会给定一个版本号,如果版本号不同则不允许操作
最常使用的是CAS自旋算法
适合读多写少的场景
image-20210901112004815

八个案例说明锁的场景
完整的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class MyLock {
public static void main(String[] args) {
Phone phone = new Phone();
Thread thread1 = new Thread(() -> {
phone.sendMail();
});

// 保证线程的先后顺序
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}

Thread thread2 = new Thread(() -> {
phone.sendSms();
});
thread1.start();
thread2.start();
}
}

class Phone {
public synchronized void sendMail() {
System.out.println("sendMail");
}

public synchronized void sendSms() {
System.out.println("sendSms");
}
}

问以下场景的输出是什么
Q1:直接运行以上程序
A:sendMail(1)、sendSms(2)
Q2:sendMail加入sleep3秒中
A:1、2(sleep不释放锁,锁的是所有同步方法)
Q3:向第二个thread中加入hello无锁方法
A:hello,1(不和你争)
Q4:有两个对象调用
A:2,1(不是一个对象,不争抢同一把锁)
Q5:同步方法变为static方法
A:1,2(类锁,无论几个对象锁的都是当前类)
Q6:在Q5的基础上,两个对象调用
A:1,2
Q7:1个静态,1个同步
A:2,1(静态与同步之间没有竞争关系)
Q8:在Q7基础上两个对象调用
A:2,1

字节码synchronized
同步代码块使用的是monitorenter和monitorexit指令控制的,有两个exit,一个用于正常退出,一个用于异常退出
普通同步方法会检查ACC_SYNCHRONIZED是否被设置,如果有当前线程会持有monitor锁,然后执行方法,最后完成后后释放
静态同步方法多了一个ACC_STATIC

独占锁共享锁

独占锁:只能被一个线程享有。synchronized和lock实现类就是独占锁,并发不行,性能较低
image-20210901112224415

共享锁:可以被多个线程享有。如果一个线程加了共享锁,那么就不能加独占锁。ReentrantReadWriteLock就是共享锁
image-20210901112357119

独占锁和共享锁的体现

互斥锁 :独占锁的体现,具有唯一性和排他性。
image-20210901112740335
读写锁:共享锁的体现。读写锁管理一组锁,一个是只读的锁,一个是写锁。
读锁:共享锁,会发生死锁,可以多个一起读

写锁:独占锁,会发生死锁,只能一个进行写
ReentrantReadWriteLock是读写锁,读读可以共享,提高性能,存在的问题,存在锁饥饿的问题,一直读的时候,程序没法写。读不可以写,写可以读。
image-20210901112911028

读写锁的降级

写入锁降低为读锁,目的提高数据的可见性

jdk8说明

1、获取写锁

2、获取读锁

3、释放写锁

4、释放读锁

此时读锁不能升级为写锁

表锁和行锁

表锁:对整张表上锁,对某条记录修改,整个表都不能访问,必须等其释放锁,不会有死锁

行锁:操作哪条记录就对哪条记录上锁,别人还可以访问别的记录,会有死锁

公平锁和非公平锁

公平锁:雨露均沾,每个线程都会分到,但不是绝对平均

非公平锁:没有顺序,不一定每个线程都会分到,分配不均
默认new出来的是非公平锁,非公平锁可能会导致线程饿死的情况,活都给别的线程分配了,分配不均,但执行效率高。而公平锁会做到雨露均沾,但不保证绝对平均,效率相对会低一点
image-20210901113319936

可重入锁

锁里面还能调锁,又叫递归锁。synchronized是隐式的,lock是显示的,都是可重入锁
应用场景:递归调用
image-20210901113357915

1
2
3
4
5
6
7
8
public synchronized void mehtodA() throws Exception{
// Do some magic tings
mehtodB();
}

public synchronized void mehtodB() throws Exception{
// Do some magic tings
}

自旋锁

线程在没有锁的时候不会被挂起,而是处于一个忙循环状态,类似等待的状态,这个又叫自旋
image-20210901113736015

目的:减少被挂起的几率,线程的唤醒和挂起很耗费资源,但一致自旋也是耗性能,所以并不适合于时间长的并发情况
如下代码,CAS 操作如果失败就会一直循环获取当前 value 值然后重试

1
2
3
4
5
6
7
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}

在JDK1.6又引入了自适应自旋,这个就比较智能了,自旋时间不再固定,由前一次在同一个锁上的自旋时间以及锁的拥有者的状态来决定。如果虚拟机认为这次自旋也很有可能再次成功那就会次序较多的时间,如果自旋很少成功,那以后可能就直接省略掉自旋过程,避免浪费处理器资源。

死锁

互相等待对方去释放锁,没有外力干涉,就无法执行下去。A需要B持有的资源,但B不释放,B需要A持有的资源,但A不释放,都在等对方释放

产生原因

1、系统资源不足

2、进程推进不合适

3、资源分配不当

如何查看

jstack,jvm自带工具,jstack 进程号,查看是否死锁

Java案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DeadLock {
public static void main(String[] args) {
//创建两个对象
Object o1 = new Object();
Object o2 = new Object();
new Thread(()->{
System.out.println("LockA 开始执行");
synchronized (o1) {
System.out.println("持有锁A,试图获取B");
// 为了第二个线程也能启动?
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (o2) {
System.out.println("获取到B");
}
}
}).start();

new Thread(()->{
System.out.println("LockB 开始执行");
synchronized (o2) {
System.out.println("持有锁B,试图获取A");
synchronized (o1) {
System.out.println("获取到A");
}
}
}).start();
}
}

结果

1
2
3
4
LockA 开始执行
LockB 开始执行
持有锁A,试图获取B
持有锁B,试图获取A

锁升级

提升锁性能,减少获取和释放锁带来的消耗,提供4种状态无锁、偏向锁、轻量级锁和重量级锁
无锁:乐观锁

偏向锁:指它会偏向于第一个访问锁的线程,如果在运行过程中,只有一个线程访问加锁的资源,不存在多线程竞争的情况,那么线程是不需要重复获取锁的,这种情况下,就会给线程加一个偏向锁。第一次访问时id为空,jvm为其线程设置id

主要作用:当同一段同步代码被一个线程访问多次,那么便在后续访问时自动获得锁
Java15逐步废除偏向锁,维护成本变高

轻量级锁:当线程竞争变得比较激烈时,偏向锁就会升级为轻量级锁,轻量级锁认为虽然竞争是存在的,但是理想情况下竞争的程度很低,通过自旋方式等待上一个线程释放锁。判断id是否一致若一致使用该对象,不一致升级为轻量级锁
根据上一次自旋的状态来确定,若上次成功,最下次自旋次数增加。失败则减少

重量级锁:也是互斥锁。如果线程并发进一步加剧,线程的自旋超过了一定次数,或者一个线程持有锁,一个线程在自旋,又来了第三个线程访问时(反正就是竞争继续加大了),轻量级锁就会膨胀为重量级锁,重量级锁会使除了此时拥有锁的线程以外的线程都阻塞。

锁优化

锁粗化:能不上锁就不上,范围能小就小

锁消除:指虚拟机编译器在运行时检测到了共享数据没有竞争的锁,从而将这些锁进行消除。
比如这个for循环

1
2
3
4
5
6
for (int i = 0; i < 1000; i++) {
StringBuffer stringBuffer = new StringBuffer();
stringBuffer.append("dali");
stringBuffer.append("技术分享");
stringBuffer.toString();
}

已知StringBuffer是一个线程安全类,但stringBuffer变量属于局部变量,存于栈中,栈为线程私有本来就是线程安全,而append方法是一个同步方法,只会白白浪费系统资源,为了提升效率,虚拟机帮我们消除了这些同步锁,这个过程就被称为锁消除

1
2
3
4
5
6
@Override
public synchronized StringBuffer append(String str) {
toStringCache = null;
super.append(str);
return this;
}

锁总结

image-20221118211931615

callable接口

创建线程的方式

1、继承Thread类

2、实现Runnable接口

3、callable接口(有返回值的Runnable)

4、线程池

传统创建线程的方式,无法获取线程返回结果,为了支持该功能,增加了callable接口

与Runnable的区别

1、是否有返回值,callbable有

2、是否抛出异常,callable抛

3、实现方法名不同,callable实现call(),另一个是run()

如何利用callable创建线程

Thread没有提供callable的参数,那么就需要找到一个类,能够建立起runnable与callable的关系

Runnable有一个实现类,FutureTask,可以传递callable接口

FutureTask:在不影响主线程的情况下,单开一个线程,做其他事情,最后在进行汇总

创建一个线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Demo1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask futureTask = new FutureTask(MyThread::new);
new Thread(futureTask,"aa").start();
while (!futureTask.isDone()) {
//不完成,输出wait
System.out.println("wait...");
}
//调用get方法
System.out.println(futureTask.get());
//调用第二次的时候直接返回
System.out.println(futureTask.get());
}
}
class MyThread implements Callable{

@Override
public Object call() throws Exception {
return null;
}
}

JUC辅助类

1、减少计数

CountDownLatch

基于AQS实现,在构建对象的时候传入的值会赋给AQS的关键变量state,-1操作是利用CAS将state-1,执行await方法,判断state是否为0,不为0则加入队列中,将该线程进行阻塞(头节点除外,会自旋等待,当state为0时,将剩余的在队列中阻塞的节点一并唤醒)。

await实际上让头节点一直在等待state为0时,释放等待的线程

设置一个计数器类,countDown()方法可以让初始值-1,允许一个线程或多个线程等待,直到这些线程完成他们的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"号线程执行了");
//每执行一个线程计数器-1
countDownLatch.countDown();
},i+"").start();
}
//如果没有变为0则等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"最后执行");
}
}

2、循环栅栏

CyclicBarrier,当线程到达某状态后,暂停下来等待其他的线程,所有线程都到达之后才会继续执行,阻塞的是任务线程,主线程是不受影响的

线程到达等待数执行条件,相较于CountDownLatch来说它是可复用的。它没有使用AQS的state变量,借助ReentrantLock和condition等待唤醒,在构建该对象时,传入的值会赋给内部维护的count变量,也会赋值给parties变量(复用的关键),调用await会使count-1,利用ReentrantLock保护线程的安全性,如果count不为0,则添加进condition队列中,当count=0,将队列中的线程全部唤醒,并将parties的值重新赋值为count实现复用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class CyclicBarrierDemo {
public static void main(String[] args){
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("线程等待数已达条件");
});
for (int i = 0; i < 7; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"线程创建完毕");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},i+"").start();
}
}
}

3、信号量机制

Semaphore,信号量机制,到达某个临界值会处于等待状态,直到线程被释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SemaphoreDemo {
public static void main(String[] args) {
//信号量设置为3,表示线程数不能超过此值,超过会等待
Semaphore semaphore = new Semaphore(3);

//创建6个线程
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
//抢占资源
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到了车位");
//留有时间抢占资源
Thread.sleep(new Random().nextInt(3));
System.out.println(Thread.currentThread().getName()+"离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//释放,别的线程唤醒继续抢车位
semaphore.release();
}
},i+"").start();
}
}
}

阻塞队列

image-20210818170912835

当队列元素满之后就不能放了,就阻塞。如果元素为空那么就不能取了,就阻塞

为什么使用:不需要关系什么时候阻塞线程,什么时候唤醒线程。阻塞队列已经实现了

分类及方法

BlockingDeque与BlockingQueue

ArrayBlockingQueue(常用)数组的队列

基于数组实现,内部维护了一个定长的数组,以便缓存队列中的数据对象,是一个常用的阻塞队列

LinkedBlockingQueue(常用)链表的队列

基于链表实现,大小默认值为int的最大值,是一个常用的阻塞队列

DealyQueue

使用优先级队列实现延迟无界阻塞队列,没有大小限制

PriorityBlockingQueue

支持优先级排序的无界阻塞队列

SynchronousQueue

单个元素的队列

LinkedTransferQueue

由链表组成的无界阻塞队列

LinkedBlockingDeque

由链表组成的双向阻塞队列

操作方法

image-20210818172323453

线程池

维护者多个线程,等待管理和分配,可以保证线程的充分利用,还能防止过分调度。控制可运行的线程数量,将任务放在队列中,线程不够用了则任务等待。

为什么需要线程池(主要特点)

降低资源消耗:重复利用一定的线程,频繁创建线程会消耗资源,还会降低系统稳定性

提高响应速度:当任务到达时,不需要等待创建线程的时间

便于管理

项目中用到了么

用到了,我负责的项目模块中有一个年度价值统计,需要创建统计模板定时发送信息。每个模板对应一个id,需要在hdfs中去找这个文件,遍历查找操作使用了线程池,因为hdfs遍历操作是一个费时任务,将此过程进行异步化,提高系统的吞吐量,使用ThreadPoolExcutor创建线程池,而不是使用executor(阿里手册不推荐,使用原生方法更能理解线程池运行规则,避免资源被耗尽的风险)

相关架构

在Java中使用Executor框架实现

怎么用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//创建线程池,有固定的线程数
ExecutorService executorService = Executors.newFixedThreadPool(3);
//创建一个线程
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
//可扩容线程
ExecutorService executorService2 = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 100; i++) {
executorService2.execute(()->{
System.out.println(Thread.currentThread().getName()+"处理业务");
});
}
} catch (Exception e) {

} finally {
executorService2.shutdown();
}

这三种创建线程池的方法底层都是new的ThreadPoolExecutor

ThreadPoolExecutor的七个参数介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

corePoolSize:核心/常驻线程数

maximumPoolSize:最大线程数

keepAliveTime:活跃时间/存活时间

unit:时间单位

workQueue:阻塞队列

threadFactory:线程工厂,用于创建线程

handler:拒绝策略,没有能力处理

工作流程

image-20210818180336914

执行execute方法的时候才会创建线程,核心线程满了之后,其余的线程会创建到阻塞队列中
阻塞队列满之后,会在最大线程数创建线程
如果都满了,则会执行拒绝策略。最大线程池中的线程会优于阻塞队列中的线程执行,俗称插队

image-20210915161942

拒绝策略

1、AbortPolicy(默认):直接抛异常

2、CallerRunsPolicy:将任务回退到调用者,又称调用者模式

3、DiscardOldestPolicy:抛弃队列中等待最久的业务,把当前业务加入队列中

4、DiscardPolicy:不处理也不拒绝

在实际开发中,以上三种创建线程池的方式都不用,自定义线程池去实现。Java开发手册中说道

image-20210818182326249

1
2
3
4
5
6
7
8
9
10
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);

AQS

AbstractQueuedSynchronizer(抽象的队列同步器),在类java.util.concurrent.locks包下面,它是一个框架,可以构造出大量的同步器比如ReentrantLock、Semaphore
主要用于解决锁分配给谁的问题(FIFO和state)
是什么:volatile+cas机制实现的锁模板,保证了代码的同步性和可见性,而aqs封装了线程阻塞等待挂起,解锁唤醒其他线程的逻辑,aqs子类只需根据状态变量,判断是否key获取锁,是否释放锁,使用LockSupport挂起,唤醒线程即可

原理

如果被请求的共享资源空闲,则当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态。如果被请求资源被占用,则需要一套唤醒机制,AQS利用CLH实现,如果获取不到资源的线程则进入队列中

CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS 是将每条请
求共享资源的线程封装成一个 CLH 锁队列的一个结点(Node)来实现锁的分配。

两种资源共享方式

独占(Exclusive):只有一个线程可以执行如ReentrantLock

共享(Share):多个线程可以同时执行

Fork/Join分支合并框架

将大任务拆分为多个子任务去执行,合并最后的计算结果,并输出

分割fork,合并join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MyTask extends RecursiveTask<Integer> {
//执行规则,拆分差值不超过10
private static final int VALUE = 10;
//开始值
private int begin;
//结束值
private int end;
//处理结果
private int result;
//创建有参构造
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
//写逻辑,拆分与合并
@Override
protected Integer compute() {
//判断相加值
if ((end - begin) <= VALUE) {
//相加
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
//进一步拆分,获取到中间值
int mid = (begin + end)/2;
MyTask left = new MyTask(begin, mid);
MyTask right = new MyTask(mid+1, end);
//拆分
left.fork();
right.fork();
//合并
result = left.join() + right.join();
}
return result;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建拆分对象
MyTask myTask = new MyTask(0, 100);
//创建合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
//获取合并的结果
Integer result = forkJoinTask.get();
System.out.println(result);
//关闭池对象
forkJoinPool.shutdown();
}
}

输出

1
5050

异步回调

Furture是Java5新增的接口,提供了一步并行计算的功能

需要用到Runnable接口,Callable接口,Future接口和具体实现类

Runnable接口与Callable接口的比较

1.Runnable无返回值,不抛异常
2.Callable有返回值,抛异常

FutureTask怎么来的

需要具备三个能力
1.多线程 — runnable
2.有返回 — callable
3.有异步 — future

public FutureTask(Callable<V> callable) {}

image-20221115142128567

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Example {
@SneakyThrows
public static void main(String[] args) {
FutureTask<String> task = new FutureTask<>(new MyThread());
Thread t1 = new Thread(task, "t1");
t1.start();
System.out.println(task.get());
}

}
@Data
class MyThread implements Callable<String> {

@Override
public String call() throws Exception {
System.out.println("yes!");
return "hello world";
}
}

FutureTask如何变为CompletableFuture

FutureTask的缺点
1.get阻塞。只要调用了,就会去找当前furture线程,如果没有完成,则会阻塞其他线程的运行。所以一般放在程序后面,假如需要设置自己的时间,过时不候,如果到规定时间无法拿回结果,那么就会抛出超时异常
2.isDone轮询会耗费cpu资源,也不见得会返回结果
3.总结FutureTask对结果的获取不是很友好,所以出现了CompletableFuture可以用声明式的方式优雅的处理需求
4.CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方

CompletableFuture

1.官方不建议直接使用new构造方法创建对象
2.run开头的无返回值(短的没有收获),supply开头没有返回值(长的才配拥有)
3.减少了阻塞和轮询
4.java8增加
5.可以发现并没有使用线程池,因为默认使用了ForkJoinPool线程池
6.用thenRun是当前指定线程池,并且会影响之后的。thenRunASync异步,只会影响当前线程池
7.指定的线程池可能用不上,会默认用快的(main)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//有返回值
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(()->{
System.out.println(Thread.currentThread().getName());
int i = 1/0;
return 1;
});
//当xxx完成之后调用,t是返回值,u是异常
completableFuture1.whenComplete((t,u)->{
System.out.println(t+":"+u);
});
completableFuture1.get();
//无返回值
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(()->{
System.out.println(Thread.currentThread().getName());
});
completableFuture2.get();

线程池的大小确定

线程过多会导致增加上下文切换成本

多线程编程中一般线程的个数都大于 CPU 核心的个数,而一个 CPU 核心在任意时刻只能被一个线程使用,为了让这些线程都能得到有效执行,CPU 采取的策略是为每个线程分配时间片并轮转的形式。当一个线程的时间片用完的时候就会重新处于就绪状态让给其他线程使用,这个过程就属于一次上下文切换。概括来说就是:当前任务在执行完 CPU 时间片切换到另一个任务之前会先保存自己的状态,以便下次再切换回这个任务时,可以再加载这个任务的状态。任务从保存到再加载的过程就是一次上下文切换。
上下文切换通常是计算密集型的。也就是说,它需要相当可观的处理器时间,在每秒几十上百次的切换中,每次切换都需要纳秒量级的时间。所以,上下文切换对系统来说意味着消耗大量的 CPU 时间,事实上,可能是操作系统中时间消耗最大的操作。
Linux 相比与其他操作系统(包括其他类 Unix 系统)有很多的优点,其中有一项就是,其上下文切换和模式切换的时间消耗非常少

类比于实现世界中的人类通过合作做某件事情,我们可以肯定的一点是线程池大小设置过大或者过小都会有问题,合适的才是最好。

如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的! CPU 根本没有得到充分利用。

但是,如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

有一个简单并且适用面比较广的公式:

CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
如何判断是 CPU 密集任务还是 IO 密集任务?

CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。单凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

atomic

原子类,位于java.util.concurrent.atomic下,总共4类。基本类型,数组类型,引用类型,对象的属性修改类型

LockSupport

对线程对待机制(wait和notify)的优化与提升。

CAS

compare and swap比较并交换
有3个操作数,内存数V,旧的预期值A,要修改的值B,当且仅当V==A,则将V修改为B,否则什么都不做或重来(自旋)

非阻塞原子性操作,由硬件保证,CAS是一条CPU的原子指令(由unsaef类提供的一种CAS方法)
执行指令时会发生什么?
会判断当前系统是否为多核系统,如果是就给总线加锁,实际上利用了CPU资源

unsafe类

是CAS的核心类,由Java的native方法访问底层系统,该类可直接操作内存中的数据
CAS是一种原语(操作系统范畴,由若干条指令组成,用于完成一个功能),原语必须是连续的,一起成功或失败,也就是说CAS是一条CPU的原子指令,不会造成数据不一致的问题

CAS的问题

如果自旋时间过长会导致开销很大
ABA问题,尽管结果是成功的也不能保证过程就没有问题。
比如ab线程,a修改值v被挂起,此时b线程修改了v为v1,又改了回去v,这时候a拿到资源发现还是预期v,那么也会进行更新。解决方式:用版本号

热点点赞器 — LongAdder,用于替换AtomicLong,性能好,减少乐观锁重试次数

LongAdder为什么这么快?

空间换时间,降低冲突概率。
基本思路是分散热点,将值分散到cell数组中,不同线程会命中到数组的不同槽中,各个线程只能对自己那个槽的值进行CAS操作,热点被分散,冲突概率降低。获取结果值,只需将各个槽累计相加即可

image-20221205182825036