封面来源:碧蓝航线 箱庭疗法 活动CG
本文参考:尚硅谷 2019-06 周阳 JUC
1. JUC 是什么
1.1 什么是 JUC
JUC 全称叫 java.util.concurrent
,这是一个 Java 中与并发有关的工具包,JUC 就是这个工具包的简称,是在并发编程中使用的工具类。
JUC 由三个构成,分别是:java.util.concurrent
、java.util.concurrent.atomic
和 java.util.concurrent.locks
,说人话就是:Java 并发包、Java 并发原子包和 Java 并发 lock 包。
1.2 知识回顾
进程与线程
进程:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
线程:通常在一个进程中可以包含若千个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位。由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效地提高系统多个程序间并发执行的程度。
说人话:
进程与操作系统有关,和编程语言无关,操作系统中后台运行的每一个程序就是一个进程。
使用QQ,查看进程一定有一个 QQ.exe 的进程,我可以用 qq 和 A 文字聊天,和 B 视频聊天,给 C 传文件,给 D 发一段语言,QQ支持录入信息的搜索。
写论文时,用 word 写论文,同时用 QQ 音乐放音乐,还用 QQ 聊天,这涉及到多个进程。
如果 word 如没有保存,突然停电关机,再通电后打开 word 可以恢复之前未保存的文档,word 还会检查你的拼写,这涉及到两个线程:容灾备份,语法检查。
并发与并行
并发:把任务在不同的时间点交给处理器进行处理。在同一时间点,任务并不会同时运行。
并行:把每一个任务分配给每一个处理器独立完成。在同一时间点,任务一定是同时运行。
说人话:
我正在吃饭,突然一个电话打了过来:
并发是一手筷子,一手电话,说一句话,咽一口饭。
并行就是咽一口饭同时说一句话,而这光靠一张嘴是办不到的,至少两张嘴。
参考链接:并发与并行的区别是什么? 回答者首页:土豆炖上小牛肉
wait 和 sleep 的区别
这两个方法都可以让当前线程暂停,它们有以下区别:
1、释放资源角度
wait 是放开手去睡,放开手里的锁。
sleep 是握紧手去睡,醒了手里还有锁。
2、来源角度
sleep 来自 Thread
类,而 wait来自 Object
类。sleep 是 Thread 中的静态类方法,谁调用的谁去睡觉,即使在线程 A 里调用了 B 的 sleep 方法,实际上还是线程 A 休眠,要让 B 线程休眠要在 B 的代码中调用sleep。
3、使用范围
wait,notify 和 notifyAll 方法只能在同步控制方法或者同步控制块里面使用,而 sleep 可以在任何地方使用。
4、异常捕获
sleep 必须捕获异常,而 wait、notify 和 notifyAll 三个方法不需要捕获异常。
线程的状态
可以查看一下源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED; }
两个等待的区别:
WAITING:属于一直等,硬等,不见不散
TIMED_WAITING:加了个 TIMED 的前缀,会指定时间。假设就等 10 秒钟,10 秒到就撤,过时不候
2. 多线程经典案例
2.1 买票案例
经典写法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 package com.yang.main;public class SaleTicket { public static void main (String[] args) { Ticket ticket = new Ticket (); new Thread (new Runnable () { @Override public void run () { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } } }, "A" ).start(); new Thread (new Runnable () { @Override public void run () { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } } }, "B" ).start(); new Thread (new Runnable () { @Override public void run () { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } } }, "C" ).start(); } } class Ticket { private int number = 30 ; public synchronized void saleTicket () { if (number > 0 ) { System.out.println(Thread.currentThread().getName() +"\t卖出第:" +(number--) +"张票,\t还剩下" +number+"张票" ); } } }
上面的代码是最开始学习多线程时就会接触到的代码,也是最经典的卖票系统。
那这和 JUC 有什么关系吗?
在实际操作中,我们一般不用 synchronized
关键字,而是使用 JUC 提供了 API。为什么呢?
上述案例中,synchronized
关键字是作用在方法之上的,就是说当前只有一个线程可以进入那个方法,那一整个方法都被锁了。如果某个方法内容很多,却只有几行需要被锁住,这时候使用 synchronized
关键字就不很“优雅”了。
打个比方,在外上厕所时会将厕所里指定的坑位的门关上,解决完了,开门,人走,这个过程就和使用锁是一个道理。synchronized
关键字就很粗暴,我要上厕所,然后把厕所的外大门关了,谁都不许进来,属于一杆打死一船人。
使用 Lock 锁
针对这个问题,可以使用 JDK 提供的 Lock
接口,既然有接口那么就有实现类,使用 ReentrantLock
实现类(可重入锁)。
修改 Ticket
资源类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Ticket { private int number = 30 ; private Lock lock = new ReentrantLock (); public void saleTicket () { lock.lock(); try { if (number > 0 ) { System.out.println(Thread.currentThread().getName() + "\t卖出第:" + (number--) + "张票,\t还剩下" + number + "张票" ); } } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
main()
方法可以使用 Lambda 表达式进行优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void main (String[] args) { Ticket ticket = new Ticket (); new Thread (() -> { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i <= 40 ; i++) { ticket.saleTicket(); } }, "C" ).start(); } }
然后进行测试,会发现测试通过。
关于 Lambda 表达式的具体使用方法,可以参考【Java 多线程基础】一文。
synchronized 和 Lock 的区别
类别
synchronized
Lock
存在层次
Java 的关键字,JVM 层面
一个接口
锁的释放
1、获取锁的线程执行完同步代码,释放锁;2、线程执行发生异常,JVM 会让线程释放锁
在 finally 中释放锁,否则会线程死锁
锁的获取
假设 A 线程获得锁,B 线程等待。如果 A 线程阻塞,B 线程会一直等待
Lock 有多种方式获取锁,Lock 会尝试获得锁,线程可以不用一直等待
锁的状态
无法判断
可以判断
锁的类型
可重入、不可中断、非公平
可重入、可中断、可公平(两者皆可)
适用场景
少量同步代码问题
大量同步代码问题
2.2 生产者与消费者
现在有这样一个题目:现有两个线程,可以操作初始值为 0 的一个变量,实现一个线程对该变量加 1,一个线程对该变量减 1,并实现交替,来 10 轮,变量初始值为 0。
简单来说,一个变量初始为 0,一个线程先判断它是否为 0,为 0 时就加 1,另一个线程判断它是否不为 0,不为 0 时就减 1。
经典写法
使用 synchronized
关键字、wait()
和 notifyAll()
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.yang.main;public class ThreadWaitNotifyDemo { public static void main (String[] args) { AirConditioner airConditioner = new AirConditioner (); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); } } class AirConditioner { private int number = 0 ; public synchronized void increment () throws InterruptedException { if (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { if (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } }
运行后的测试结果就是:加一减一循环十次。
防止虚假唤醒
上面的案例是最简单的生产者消费者问题,那如果有多个生产者或消费者呢?比如有两个生产者和两个消费者,这时候在 main()
函数中创建四个线程执行代码,运行后就会出现问题。主要是因为采用了 if
进行判断,当一个线程 wait 后,后续将其唤醒时,不会再进行二次判断,这时候就会出现数据异常,对上述案例来说,就会多加一。这就是线程的虚假唤醒。
那应该怎么做?
在 JDK 文档的 notify()
方法中有这样一段字:
多线程交互中,必须防止多线程的虚假唤醒,即:判断只用 while,不用 if。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 package com.yang.proAndCon;public class ThreadWaitNotifyDemo2 { public static void main (String[] args) { AirConditioner2 airConditioner2 = new AirConditioner2 (); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "A" ).start(); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "B" ).start(); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner2.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "C" ).start(); new Thread (()->{ for (int i = 0 ; i < 10 ; i++) { try { airConditioner2.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "D" ).start(); } } class AirConditioner2 { private int number = 0 ; public synchronized void increment () throws InterruptedException { while (number != 0 ) { this .wait(); } number++; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } public synchronized void decrement () throws InterruptedException { while (number == 0 ) { this .wait(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); this .notifyAll(); } }
运行上述代码后,会发现输出结果不再异常。
理解一下虚假唤醒: 假设现在有两对生产者消费者,其中 A 和 B 是生产者,C 和 D 是消费者(此时阻塞判断使用if
)。假设生产者 A 先运行,此时没有任何商品,生产者 A 阻塞判断失败,干活,产品加一。然后生产者 A 再次运行,此时有一件商品,阻塞判断成功,A 被阻塞。生产者 B 运行,此时有一件商品,阻塞判断成功,B 也被阻塞。消费者 C 运行,此时有一件商品,商品数量不为 0,阻塞判断失败,干活,C 消费一件商品,商品数量为 0,然后唤醒所有 wait 的线程。由于 A 和 B 开始处于阻塞状态,D 处于就绪状态,唤醒阻塞状态的线程后,这些线程的优先级更高。假设生产者 A 又运行,由于 A 是被唤醒的,且采用 if
判断,不会进行二次判断,直接干活,商品数量加一。生产者 B 也运行,与 A 同理,不会进行二次判断,商品数量又加一。而这,就是虚假唤醒!
新版消费者生产者写法
JDK8 文档中有这样一段话:
大致的意思就是提供了一个 Condition
对象,那这对象又是什么?
简单来说,Lock
替换了同步方法和语句的使用,而 Condition
替换了 Object
监视器方法的使用,在文档中也给出了创建一个 Condition
对象的方法。
Condition
是一个接口,其中也有一些方法:
上图圈出的方法就是替代的方法,我们可以得出下图来理解:
常见面试题:手写单例模式、手写冒泡排序算法、手写生产者消费者问题。
新版写法代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class ThreadWaitNotifyDemo3 { public static void main (String[] args) { AirConditioner3 airConditioner3 = new AirConditioner3 (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { airConditioner3.increment(); } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { airConditioner3.decrement(); } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { airConditioner3.increment(); } }, "C" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { airConditioner3.decrement(); } }, "D" ).start(); } } class AirConditioner3 { private int number = 0 ; private Lock lock = new ReentrantLock (); private Condition condition = lock.newCondition(); public void increment () { lock.lock(); try { while (number != 0 ) { condition.await(); } number++; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement () { lock.lock(); try { while (number == 0 ) { condition.await(); } number--; System.out.println(Thread.currentThread().getName() + "\t" + number); condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
精确通知顺序访问
我们使用 Lock
代替了 synchronized
,使用 await
和 signal
代替了 wait
和 notify
,那为什么要这样代替呢?很简单,新版的东西代替旧版,不仅可以干旧版能干的事,还可以干旧版不能干的事。
假设现在我们需要精确打击:有三个线程,分别叫 A、B 和 C。A 线程打印 5 次,然后 B 线程打印 10 次,最后 C 线程打印 10 次,按照这个顺序来 10 轮。
这个需求和前面的案例就不同了,因为线程的调用有顺序了,对此,我们可以引入一个标志位。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 public class ThreadOrderAccess { public static void main (String[] args) { ShareSource shareSource = new ShareSource (); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { shareSource.print5(); } }, "A" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { shareSource.print10(); } }, "B" ).start(); new Thread (() -> { for (int i = 0 ; i < 10 ; i++) { shareSource.print15(); } }, "C" ).start(); } } class ShareSource { private int number = 1 ; private final Lock lock = new ReentrantLock (); private final Condition condition1 = lock.newCondition(); private final Condition condition2 = lock.newCondition(); private final Condition condition3 = lock.newCondition(); public void print5 () { lock.lock(); try { while (number != 1 ) { condition1.await(); } for (int i = 0 ; i < 5 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } number = 2 ; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print10 () { lock.lock(); try { while (number != 2 ) { condition2.await(); } for (int i = 0 ; i < 10 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } number = 3 ; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print15 () { lock.lock(); try { while (number != 3 ) { condition3.await(); } for (int i = 0 ; i < 15 ; i++) { System.out.println(Thread.currentThread().getName() + "\t" + i); } number = 1 ; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
运行后,以下操作循环十次:线程 A 打印 0 ~ 4,线程 B 打印 0 ~ 9,线程 C 打印 0 ~14。
3. 八锁的现象
1、标准访问,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.yang.lock8;public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public synchronized void sendEmail () throws Exception { System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } }
结论:先打印邮件,再打印短信。
原因:被 synchronized
修饰的方法,锁的对象是方法的调用者。两个方法的调用者是同一个,因此两个方法用的是同一个锁,先调用方法的先执行。
2、邮件方法暂停 4 秒,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } }
结论:先打印邮件,再打印短信。
原因:原因和示例一同理,就算第一个线程被暂停了 4 秒,第二个线程的执行也只能在第一个线程之后。
3、新增一个普通方法,先打印邮件还是普通方法?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone.hello(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印普通方法再打印邮件。
原因:新增的方法没有 synchronized
修饰,不是同步方法,不受锁的影响,不需要等待,直接执行。
4、两部手机,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone2.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印短信再打印邮件。
原因:synchronized
修饰的方法,锁的对象是方法的调用者。因为有两个对象调用方法,两个方法的调用者不是同一个,相当于有两个锁,各执行各的就行。因为第一个线程执行方法时被阻塞了 4 秒,因此第二个线程先执行。
5、两个静态同步方法,同一部手机,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public static synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public static synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印邮件再打印短信。
原因:被 synchronized
和 static
修饰的方法,锁的对象不是实例对象,而是 Class 对象,即:Phone.class
。因为两个同步方法都被 static
修饰了,所以两个方法用的是同一个锁,先调用的先执行。
6、两个静态同步方法,两部手机,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone2.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public static synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public static synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印邮件再打印短信。
原因:原因和实例六一样,纵使有两个对象,但是 Class 对象只有一个(Phone.class 只有一个,或者说模板只有一个),锁的也是这个对象,只有一个锁,因此先调用的先执行。
7、一个普通同步方法,一个静态同步方法,同一部手机,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public static synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印短信再打印邮件。
原因:静态同步方法锁的对象是 Class 对象,普通同步方法锁的是方法的调用者。两个锁的对象不是同一个,后调用的方法 不必等待 先调用的方法执行完毕后才执行。
8、一个普通同步方法,一个静态同步方法,两部手机,先打印邮件还是短信?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class Lock8 { public static void main (String[] args) throws Exception { Phone phone = new Phone (); Phone phone2 = new Phone (); new Thread (() -> { try { phone.sendEmail(); } catch (Exception e) { e.printStackTrace(); } }, "A" ).start(); Thread.sleep(100 ); new Thread (() -> { try { phone2.sendSMS(); } catch (Exception e) { e.printStackTrace(); } }, "B" ).start(); } } class Phone { public static synchronized void sendEmail () throws Exception { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("------> sendEmail" ); } public synchronized void sendSMS () throws Exception { System.out.println("------> sendSMS" ); } public void hello () { System.out.println("------> hello" ); } }
结论:先打印短信再打印邮件。
原因:原因和实例七一样。需要注意的是,虽然有两个实例对象,但是静态同步方法锁的对象是 Class 对象,而不是方法的调用者。
总结
new、this 可以理解为同一部手机,而静态 class 相当于唯一的一个模板。
一个对象里面如果有多个 synchronized
方法,某一个时刻内,只要一个线程去调用其中的一个 synchronized
方法了,其它的线程都只能等待。换句话说,某一个时刻内,只能有唯一一个线程去访问这些 synchronized
方法,即:锁的是当前对象 this
,被锁定后,其它的线程都不能进入到当前对象的其它的 synchronized
方法。
加个普通方法后发现和同步锁无关,换成两个对象后,不是同一把锁了,情况立刻变化。
都换成静态同步方法后,情况又变化,所有的非静态同步方法用的都是同一把锁,即:实例对象本身。
synchronized
实现同步的基础:Java 中的每一个对象都可以作为锁。具体表现为以下3种形式:
当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁,可是别的实例对象的普通同步方法因为跟该实例对象的普通同步方法用的是不同的锁,所以无需等待该实例对象已获取锁的普通同步方法释放锁就可以获取它们自己的锁。
所有的静态同步方法用的也是同一把锁,即:类对象本身。
这两把锁(this / Class)是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞争条件的。但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们同一个类的实例对象!
4. 集合类不安全
4.1 List 不安全
单线程情况下
1 2 3 4 5 6 public static void main (String[] args) { List<String> list = Arrays.asList("a" , "b" , "c" ); for (String s : list) { System.out.println(s); } }
多线程情况下
3 个线程执行 List 的添加和打印:
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { List<String> list = new ArrayList <>(); for (int i = 0 ; i < 3 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(list); }, String.valueOf(i)).start(); } }
运行后,结果“丰富多彩”,数据有对的也有错的(有数据不一致的情况发生),但是程序一般不会报错。
30 个线程执行 List 的添加和打印:
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { List<String> list = new ArrayList <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(list); }, String.valueOf(i)).start(); } }
运行后,有可能会出现以下错误:
在多线程情况下,出现了 ConcurrentModificationException
异常。
最主要的是,ArrayList
是线程不安全的,我们可以使用 Vector
来代替,这个类是线程安全的,这个类的方法上加了 synchronized
关键字。
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { List<String> list = new Vector <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(list); }, String.valueOf(i)).start(); } }
还可以使用 Collections
工具类中的方法,那些方法可以将线程不安全的集合转换为线程安全的集合。建议在小数据量的时候使用:
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { List<String> list = Collections.synchronizedList(new ArrayList <>()); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(list); }, String.valueOf(i)).start(); } }
Vector
类在 JDK1.0 中就有了,而 Collections
是 JDK 1.2 中的东西,在实际的高并发多线程场景下,我们都不这么使用,而是使用 JDK 1.5 中 JUC 中的 CopyOnWriteArrayList
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ListNotSafe { public static void main (String[] args) { List<String> list = new CopyOnWriteArrayList <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ list.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(list); }, String.valueOf(i)).start(); } } }
使用 ArrayList
时,读数据可以多个读,但是写数据就会出现问题;使用 Vector
时,使用了 synchronized
关键字,保证读写一致,读数据和写数据都只能一个线程去读写,效率低下。要解决这个问题,可以使用写入复制(COW,CopyOnWrite)的思想,采用 CopyOnWriteArrayList
类。
可以进入其源码查看一下 add()
方法的源码:
CopyOnWrite 容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器 Object[]
添加,而是先将当前容器 Object[]
进行Copy,复制出一个新的容器 Object[] newElements
, 然后往新的容器 Object[] newElements
里添加元素,添加完元素之后,再将原容器的引用指向新的容器 setArray(newElements);
。这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器。
在并发场景下,我们一般使用 CopyOnWriteArrayList
,而不用 Vector
,Vector
的增删改查方法都加了synchronized
,以保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList
只是在增删改上加锁,但是读不加锁,在读方面的性能就好于 Vector
,支持读多写少的并发情况。
4.2 Set 不安全
与 List 一样,在 30个线程执行添加和打印的情况下,可能会出现 ConcurrentModificationException
异常。
解决方法也差不多,可以使用:
1 Set<String> set = Collections.synchronizedSet(new HashSet <>());
当然 JUC 中也有对应的类,名为:CopyOnWriteArraySet
。
1 2 3 4 5 6 7 8 9 10 11 public class SetNotSafe { public static void main (String[] args) { Set<String> set = new CopyOnWriteArraySet <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ set.add(UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(set); },String.valueOf(i)).start(); } } }
HashSet 的底层
HashSet 的底层就是 HashMap,点进源码:
1 2 3 public HashSet () { map = new HashMap <>(); }
查看 HashSet 的 add()
方法:
1 2 3 public boolean add (E e) { return map.put(e, PRESENT)==null ; }
可以看到就是调用了 HashMap 的 put()
方法,而 PRESENT
只是个常量,源码中有:
1 2 private static final Object PRESENT = new Object ();
4.3 Map 不安全
HashMap 回顾
1 2 3 Map<String, String> map1 = new HashMap <>(); Map<String, String> map2 = new HashMap <>(16 , 0.75f );
上面两种写法是等价的,可以在创建 HashMap
对象时就指定初始容量和负载因子。
在实际开发中,一般会指定初始容量,减少后续扩容次数,以增加性能。
HashMap 不安全
1 2 3 4 5 6 7 8 9 10 11 12 public class MapNotSafe { public static void main (String[] args) { Map<String, String> map = new HashMap <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(map); }, String.valueOf(i)).start(); } } }
运行上述代码后,也有可能会出现 ConcurrentModificationException
异常。
解决方法也和 List、Set 类似,比如:
1 Map<String, String> map = Collections.synchronizedMap(new HashMap <>());
同样在 JUC 包中也有这样一个类,名为:ConcurrentHashMap
1 2 3 4 5 6 7 8 9 10 11 public static void main (String[] args) { Map<String, String> map = new ConcurrentHashMap <>(); for (int i = 0 ; i < 30 ; i++) { new Thread (()->{ map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0 , 8 )); System.out.println(map); }, String.valueOf(i)).start(); } }
注意区分 ConcurrentHashMap
与前两个类的区别。
注意复习 HashMap 的底层结构,参考【数据结构之哈希表】。
5. Callable
Callable 与 Runnable 的区别
1、重写的方法不同,一个是 call()
,一个是 run()
2、call()
方法抛出了异常,而 run()
方法没有
3、call()
方法有返回值,而 run()
方法没有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class MyThread implements Runnable { @Override public void run () { } } class MyThread2 implements Callable <Integer> { @Override public Integer call () throws Exception { return null ; } }
基本使用
Callable
是一个带泛型的接口,既然是接口,那么就需要被实现。因此有:
1 2 3 4 5 6 7 8 class MyThread implements Callable <Integer> { @Override public Integer call () throws Exception { System.out.println("******come in here" ); return 1024 ; } }
那就下来怎么做?当然是将实现了 Callable
接口的对象传入 Thread
中,看一下 Thread
的构造方法:
发现没有传入 Callable
的构造方法,那怎么办?
我们可以传入一个 Runnable
的子类,这个类不仅和 Runnable
有关,还和 Callable
有关,相当于在两个之间搭了个桥。进入 Runnable
接口查看:
进入子类 RunnableFuture
:
查看其实现类 FutureTask
,并查看其构造方法:
在此,我们终于发现了连接 Runnable
与 Callable
的桥梁。
简单来说,Thread
类中没有传入 Callable
的构造方法,但是有 Runnable
的构造方法。因此可以传入一个 FutureTask
对象,这个类连接了 Callable
与 Runnable
。那么则有:
1 2 3 4 5 6 7 public class CallableDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> task = new FutureTask <>(new MyThread ()); new Thread (task, "A" ).start(); System.out.println(task.get()); } }
运行后,打印出:
Callable 细节
Callable
是 异步 执行的。
获取 Callable
中 call()
方法的返回值可以使用 get()
方法,但是 get()
方法一般放在方法的最后一行,否则可能会引起阻塞,就丧失了异步执行的优点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class CallableDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> task = new FutureTask <>(new MyThread ()); new Thread (task, "A" ).start(); new Thread (task, "B" ).start(); System.out.println(Thread.currentThread().getName() + " ******计算完成" ); System.out.println(task.get()); } } class MyThread implements Callable <Integer> { @Override public Integer call () throws Exception { System.out.println("******come in Callable" ); try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } return 1024 ; } }
开一个新线程,但是使用的同一个 FutureTask
对象时,会有结果缓存,不会二次打印结果。
上述代码运行结果:
6. 强大的辅助类
6.1 CountDownLatch
假设现在有一个需求:有六个线程和一个主线程共七个线程,每个线程执行一条打印语句,要求六个线程执行完打印语句后主线程才执行打印语句。
针对这个问题,可以使用 JUC 下的 CountDownLatch
类,具体使用如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class CountDownLatchDemo { public static void main (String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (6 ); for (int i = 0 ; i < 6 ; i++) { new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t离开教室" ); countDownLatch.countDown(); }, String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName() + "\t班长关门走人" ); } }
运行结果:
适用场景:一个线程等待一批线程达到同步点,之后继续进行。
基本原理
CountDownLatch
主要有两个方法,当一个或多个线程调用 await()
方法时,这些线程会阻塞
其它线程调用 countDown()
方法会将计数器减 1(调用 countDown()
方法的线程不会阻塞)
当计数器的值变为 0 时,因 await()
方法阻塞的线程会被唤醒,继续执行
需要注意的是: 计数器不能重用。
6.2 CyclicBarrier
CyclicBarrier 的直译就是循环屏障,谷歌翻译为:篱栅。
这个类与 CountDownLatch
相反,CountDownLatch
是指定一个数然后依次减少,而 CyclicBarrier
是指定一个数,从 0 开始增加,增加到指定的数后才执行。
就好比:收集龙珠,必须要集齐七颗龙珠才能召唤神龙一样。
构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class CyclicBarrierDemo { public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier (7 , () -> { try { TimeUnit.SECONDS.sleep(4 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " 集齐七颗龙珠召唤神龙" ); }); for (int i = 1 ; i <= 7 ; i++) { final int tempInt = i; new Thread (() -> { System.out.println(Thread.currentThread().getName() + "\t收集到" + tempInt + "颗龙珠" ); try { cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } } }
运行结果如下:
最后一条打印语句的线程名不固定。
CountDownLatch 与 CyclicBarrier 的区别
根据上面两个案例,我们发现 CountDownLatch
与 CyclicBarrier
似乎差不多,其实这俩还是有区别的,区别如下:
CountDownLatch
CyclicBarrier
减计数方式
加计数方式
计数为 0 时,无法重置,无法重复利用
计数达到指定值时,计数置为 0 可重新开始,可重复使用
调用 countDown()
方法计数减一,调用 await()
方法只进行阻塞,对计数没任何影响
调用 await()
方法计数加 1,若加 1 后的值不等于构造方法的值,则线程阻塞
CountdownLatch
阻塞主线程,等所有子线程都执行了且达到执行数量后再执行主线程。
CyclicBarrier
阻塞一组线程,直至某个状态之后再全部同时执行,并且所有线程都被释放后,还能通过 reset()
来重用,这也是 Cyclic 一词的体现。
如果使用 CyclicBarrier(int parties, Runnable barrierAction)
构造函数,调用 await()
方法的次数必须等于 parties 后,构造方法中 Runnable 接口重写的 run()
方法才会执行,否则一直不执行。
参考链接:循环屏障CyclicBarrier以及和CountDownLatch的区别
6.3 Semaphore
Semaphore [ˈseməfɔː(r)]
可译为:信号、信号量。
模拟一个抢车位的场景:只有三个车位,但是有 6 辆车,每辆车都要停进去并出来一次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class SemaphoreDemo { public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 6 ; i++) { new Thread (() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "\t抢占到了车位" ); TimeUnit.SECONDS.sleep(3 ); System.out.println(Thread.currentThread().getName() + "\t离开了车位" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }, String.valueOf(i)).start(); } } }
根据以上的 Demo,在实际情况下可用于:
原理
在信号量上我们定义两种操作:
acquire(获取)当一 个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减 1),要么一直等下去,直到有线程释放信号量,或超时。
release(释放)实际上会将信号量的值加 1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
拓展思考:Semaphore semaphore = new Semaphore(1);
与 synchronized
效果相同,假设要求某一线程在固定的时间内持有某一资源就可以使用这种方法。
7. 读写锁
读写锁,即:ReadWriteLock
,JDK 官方 API 文档是这么说的:
对于 Lock 来说,无论是读还是写都只能有一个线程进入,这样的效率不高。
对于读来说,可以多个线程进行读,但是为了数据一致性,写操作只能有一个线程。
案例分析
不使用读写锁时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ReadWriteLockDemo { public static void main (String[] args) { MyCache myCache = new MyCache (); for (int i = 0 ; i < 5 ; i++) { final int tempInt = i; new Thread (()->{ myCache.put(tempInt+ "" , tempInt+"" ); }, String.valueOf(i)).start(); } for (int i = 0 ; i < 5 ; i++) { final int tempInt = i; new Thread (()->{ myCache.get(tempInt+ "" ); }, String.valueOf(i)).start(); } } } class MyCache { private volatile Map<String, Object> map = new HashMap <>(); public void put (String key, Object value) { System.out.println(Thread.currentThread().getName() +"\t开始写入" + key); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName() +"\t------写入成功" ); } public void get (String key) { System.out.println(Thread.currentThread().getName() +"\t开始读取" ); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { e.printStackTrace(); } Object result = map.get(key); System.out.println(Thread.currentThread().getName() +"\t读取成功" + result); } }
运行上述代码后,会出现一个很大的错误:才刚开始写入,还没写入完成时,就已经开始读取了。
针对这个问题,可以使用读写锁,读在写之后,开始写入和成功写入成对出现。
根据 API 文档,ReadWriteLock
是一个接口,使用时需要使用它的实现类 ReentrantReadWriteLock
。基于此,可将 MyCache
类修改成如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class MyCache { private volatile Map<String, Object> map = new HashMap <>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock (); public void put (String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + "\t开始写入" + key); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { e.printStackTrace(); } map.put(key, value); System.out.println(Thread.currentThread().getName() + "\t------写入成功" ); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } public void get (String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "\t开始读取" ); try { TimeUnit.MILLISECONDS.sleep(300 ); } catch (InterruptedException e) { e.printStackTrace(); } Object result = map.get(key); System.out.println(Thread.currentThread().getName() + "\t读取成功" + result); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } } }
再次运行代码后,符合我们的要求。
概念拓展
引入两个概念:排他锁与共享锁。
排他锁: 排他锁(exclusive locks)也叫独占锁、写锁,简称 X 锁,是指该锁一次只能被一个线程所持有。如果线程 T 对数据 A 加上排他锁后,则其他线程不能再对 A 加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。JDK 中的 synchronized
和 JUC 中 Lock
的实现类就是互斥锁。
共享锁: 共享锁(share locks)又称为读锁,简称 S 锁,是指该锁可被多个线程所持有。如果线程 T 对数据 A 加上共享锁后,则其他线程只能对 A 再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。 独享锁与共享锁也是通过 AQS 来实现的,通过实现不同的方法,来实现独享或者共享。
参考链接:独占锁与共享锁
8. 阻塞队列
8.1 阻塞队列种类
BlockingQueue
,阻塞队列,阻塞队列是一个队列,在数据结构中的作用如下图:
线程 1 往阻塞队列里添加元素,线程 2 从阻塞队列中移除元素。
阻塞:必须要阻塞 / 不得不阻塞。
阻塞队列特点
当队列是空的,从队列中 获取 元素的操作将会被阻塞。
当队列是满的,从队列中 添加 元素的操作将会被阻塞。
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素。
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增。
阻塞队列用处
在多线程领域:所谓阻塞,在某些情况下会 挂起 线程(即阻塞),一旦条件满足,被挂起的线程又会自动 被唤起
为什么需要 BlockingQueue
?
好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切 BlockingQueue
都给你一手包办了。
在 concurrent 包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
阻塞队列的种类
ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :由链表结构组成的有界(但大小默认值为 integer.MAX_VALUE
)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue :不存储元素的阻塞队列,也即单个元素的队列。
LinkedTransferQueue:由链表组成的无界阻塞队列。
LinkedBlockingDeque:由链表组成的双向阻塞队列。
8.2 阻塞队列种类 API
核心方法:
方法类型
抛出异常
特殊值
阻塞
超时
插入
add(e)
offer(e)
put(e)
offer(e, time, unit)
移除
remove()
poll()
take()
poll(time, unit)
检查
element()
peek()
不可用
不可用
异常抛出测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void exceptionTest () { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue <>(3 ); System.out.println(blockingQueue.add("a" )); System.out.println(blockingQueue.add("b" )); System.out.println(blockingQueue.add("c" )); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.element()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); }
特殊值测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void specialValueTest () { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue <>(3 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.offer("x" )); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.peek()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); }
阻塞测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void blockingTest () throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue <>(3 ); blockingQueue.put("a" ); blockingQueue.put("b" ); blockingQueue.put("c" ); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); }
超时测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void timeoutTest () throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue <>(3 ); System.out.println(blockingQueue.offer("a" )); System.out.println(blockingQueue.offer("b" )); System.out.println(blockingQueue.offer("c" )); System.out.println(blockingQueue.offer("x" , 3L , TimeUnit.SECONDS)); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll(2L , TimeUnit.SECONDS)); }
前面所示的核心方法最好按照给出的测试案例一样成对出现。
SynchronousQueue 同步队列
SynchronousQueue
没有容量。与其他的 BlockingQueue
不同,SynchronousQueue
是一个不存储元素的 BlockingQueue 。
每一个 put 操作必须要等待一个 take 操作,否则不能继续添加元素。
假设有两个线程,一个线程只对 SynchronousQueue 中添加三个元素,而另一个线程只对 SynchronousQueue 移除三个元素,那么一定是添加一个移除一个,如此三轮。
9. 线程池
9.1 线程池的优势
为什么要用线程池:
10 年前单核 CPU 电脑,是假的多线程,像马戏团的小丑玩多个球 ,CPU 需要来回切换。
如今是多核电脑,多个线程各自跑在独立的 CPU 上,不用切换效率高。
线程池的优势:
线程池做的工作主要是:控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为: 线程复用 、控制最大并发数 、管理线程 。
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行。
第三:提高线程的可管理性,线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。
9.2 三大方法
Java 中的线程池是通过 Executor
框架实现的,该框架中用到了 Executor
,Executors
,
ExecutorService
,ThreadPoolExecutor
这几个类。
三大方法
Executors.newFixedThreadPool(int nThreads)
:
执行长期任务性能好,创建一个线程池,一池有 N 个固定的线程,有固定线程数的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void methodTest1 () { ExecutorService threadPool = Executors.newFixedThreadPool(5 ); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t办理业务" ); }); TimeUnit.MILLISECONDS.sleep(400 ); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } }
Executors.newSingleThreadExecutor()
:
创建一个线程池,这个线程池里有且仅有一个线程。一个任务一个任务地执行,一池一线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static void methodTest2 () { ExecutorService threadExecutor = Executors.newSingleThreadExecutor(); try { for (int i = 0 ; i < 10 ; i++) { threadExecutor.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t办理业务" ); }); TimeUnit.MILLISECONDS.sleep(400 ); } } catch (Exception e) { e.printStackTrace(); } finally { threadExecutor.shutdown(); } }
Executors.newCachedThreadPool()
:
执行很多短期异步任务,线程池根据需要创建新线程,但在先前构建的线程可用时将重用它们,可扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void methodTest3 () { ExecutorService threadPool = Executors.newCachedThreadPool(); try { for (int i = 0 ; i < 10 ; i++) { TimeUnit.SECONDS.sleep(1 ); threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t办理业务" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } }
9.3 ThreadPoolExecutor
点开三大方法后查看,会发现这三个方法都是调用的 ThreadPoolExecutor()
方法。
将其放在一张图上进行对比:
会发现一些熟悉的角色,比如 TimeUnit
类,又比如各种阻塞队列。
我们点开 ThreadPoolExecutor()
会发现有 7 个参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException (); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException (); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
下面对这 7 个参数进行一一讲解。 👊
ThreadPoolExecutor 的七大参数
corePoolSize
:线程池中的常驻核心线程数。
maximumPoolSize
:线程池中能够容纳同时执行的最大线程数,这个值必须大于等于 1。
keepAliveTime
:多余的空闲线程的存活时间。当前池中线程数量超过 corePoolSize
时,且空闲时间达到 keepAliveTime
时,多余线程会被销毁直到剩下 corePoolSize
个线程为止。
unit
: keepAliveTime 的单位。单位有:
1 2 3 4 5 6 7 TimeUnit.DAYS; TimeUnit.HOURS; TimeUnit.MINUTES; TimeUnit.SECONDS; TimeUnit.MILLISECONDS; TimeUnit.MICROSECONDS; TimeUnit.NANOSECONDS;
workQueue
:任务队列,被提交但尚未被执行的任务。有 ArrayBlockingQueue
、
LinkedBlockingQueue
、SynchronousQueue
三种可选。
threadFactory
:表示生成线程池中工作线程的线程工厂,用于创建线程,一般使用默认的即可
handler
:拒绝策略。表示当队列满了,并且工作线程大于等于线程池的最大线程数(maximumPoolSize)时如何来拒绝请求执行的 runnable 的策略。
9.4 线程池底层工作原理
链接分享:
线程池的简介及底层原理
深入源码分析Java线程池的实现原理
ThreadPoolExecutor
方法分析图示:
主要处理流程:
显示例子描述
线程池就相当于一个银行网点,假设银行中共用 5 个办理窗口(maximumPoolSize),但是平时只有 2 个办理窗口(corePoolSize)一直工作,还有 3 个位置的候客区(workQueue)。
假设来了两个用户需要办理业务,那么就去常用的两个窗口办理业务。然后又来了三个用户,由于先前两个用户还在办理业务,那么就需要将新来了三个用户安置在候客区,等待先前来的两个用户办理完。
突然又来了三个用户,常用窗口已满,候客区已满,窗口进行扩容,增加 3 个窗口,扩容至五个,候客区的三个用户先办理用户,新来的三个用户安置在候客区。
这时,5 个窗口全部都在使用,候客区也满了,突然又来了两位用户,那么就会按照策略(handler)处理这两位用户。
当增加的三个窗口在单位为 unit,数值为 keepAliveTime 的时间之后仍然没有用户使用,那么就会进行缩容,将可办理业务窗口数量又重置为 2 个。
文字描述
1、在创建了线程池后,开始等待请求。
2、当调用 execute()
方法添加一个请求任务时,线程池会做出如下判断:
如果正在运行的线程数量小于 corePoolSize
,那么马上创建线程运行这个任务:
如果正在运行的线程数量大于或等于 corePoolSize
,那么将这个任务放入队列:
如果这个时候队列满了且正在运行的线程数量还小于 maximumPoolSize
,那么还是要创建非
核心线程立刻运行这个任务;
如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize
,那么线程池会启动饱和拒绝策略来执行。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做超过一定的时间(keepA1iveTime)时,线程会判断:
9.5 线程池的使用
前文说到,线程池的创建方式有三种:单一的、固定数的和可变数量的。那么工作中用哪种呢?
实际情况是都不用!工作中只能使用自定义的。
但是 JDK 都提供了 Executors
,为啥不用呢?
阿里巴巴开发手册中有这样一段话:
OOM:OutOfMemory,内存溢出。
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static void createMyThreadPool () { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 5 , 2L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .AbortPolicy()); try { for (int i = 0 ; i < 8 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t办理业务" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } }
9.6 拒绝策略
当等待队列已经排满了,再也塞不下新任务了,同时线程池中的 max 线程也达到了,无法继续为新任务服务。这个时候就需要拒绝策略机制合理地处理这个问题。
内置拒绝策略
AbortPolicy:直接抛出 RejectedExecutionException
异常阻止系统正常运行
CallerRunsPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
DiscardPolicy:该策略默默地丢弃无法处理的任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的拒绝策略
以上内置拒绝策略都实现了 RejectedExecutionHandler
接口。
测试代码
对于 CallerRunsPolicy 拒绝策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class MyThreadPool { public static void main (String[] args) { createMyThreadPool(); } public static void createMyThreadPool () { ExecutorService threadPool = new ThreadPoolExecutor ( 2 , 5 , 2L , TimeUnit.SECONDS, new LinkedBlockingQueue <>(3 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor .CallerRunsPolicy()); try { for (int i = 0 ; i < 10 ; i++) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "\t办理业务" ); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdown(); } } }
运行结果:
将上述代码的拒绝策略改为 DiscardPolicy 后,运行结果为:
会发现只有 8 个线程执行。
拓展
在上述代码中,我们规定线程池能够容纳的最大线程数是 5 ,这是怎么得出的呢?可以随便取吗?
针对一个 CPU 密集型的程序(以计算为主的程序),使用多线程时,最好充分发挥 CPU 的核心,我们的取值一般是 CPU 的核心数加一 。
太大也不行,频繁切换线程也会导致效率的丢失。
查看 CPU 核心的方法有很多,比如打开任务管理器 — 性能 — CPU,其中的逻辑处理器就是 CPU 核心数,还可以打开设备管理器查看。还可以通过一行代码查看:
1 System.out.println(Runtime.getRuntime().availableProcessors());
上述这行代码就可以输出 CPU 的核心数。
我的 CPU 的逻辑处理器数是 4,因此将最大线程数取值为 5。
针对一个 IO 密集型的程序(以网络或 IO 为主的程序),CPU 的消耗很少,任务的大部分时间都在等待 IO 操作完成(因为IO的速度远远低于CPU和内存的速度)。常见的大部分任务都是 IO 密集型任务,比如 Web 应用。对于 IO 密集型任务,任务越多,CPU 效率越高(但也有限度)。
一般配置是:CPU 总核心数 * 2 + 1
具体公式是:线程数 = CPU 总核心数 / (1 - 阻塞系数)
阻塞系数:线程花在系统 IO 上的时间与 CPU 密集任务所耗的时间比值
参考链接:
根据CPU核心数确定线程池并发线程数
线程池线程数目的确定
10. Java8 流式计算
10.1 四大函数式接口
Java 内置核心四大函数式接口:
函数式接口
参数类型
返回类型
用途
Consumer<T>
消费型接口
T
void
对类型为 T 的对象应用操作, 包含方法:void accept(T t)
Supplier<T>
供给型接口
无
T
返回类型为 T 的对象, 包含方法:T get()
Function<T, R>
函数型接口
T
R
对类型为 T 的对象应用操作,并返回结果。结果是 R 类型的对象。 包含方法:R apply(T t)
Predicate<T>
断定型接口
T
boolean
确定类型为 T 的对象是否满足某约束,并返回 boolean 值。 包含方法:boolean test(T t)
我们知道函数式接口可以使用 Lambda 表达式来书写,因此列举一下上面几个接口的实际应用。
函数型接口
1 2 3 4 public static void functionTest () { Function<String, Integer> function = s -> {return s.length();}; System.out.println(function.apply("abc" )); }
断定型接口
1 2 3 4 public static void predicateTest () { Predicate<String> predicate = s -> {return s.isEmpty();}; System.out.println(predicate.test("mofan" )); }
消费性接口
1 2 3 4 5 6 public static void consumerTest () { Consumer<String> consumer = s -> { System.out.println(s); }; consumer.accept("hello world" ); }
供给型接口
1 2 3 4 public static void supplierTest () { Supplier<String> supplier = ()->{return "Java No.1" ;}; System.out.println(supplier.get()); }
10.2 Stream 流式计算
流到底是什么
流是数据渠道,用于操作数据源(集合、数组等)所生成的元素序列。
“集合讲的是数据,流讲的是计算!”
流的特点:
Stream 自己不会存储元素
Stream 不会改变原对象。相反,它们会返回一个持有结果的新 Stream。
Stream 操作是延迟执行的。这意味着它们会等到需要结果的时候才执行。
怎么使用流?
创建一个 Stream:一个 数据源 (数组、集合)
中间操作:一个中间操作,处理 数据源数据
终止操作:一个终止操作,执行中间操作链,产生结果
基本使用
先给出一个实体类:
1 2 3 4 5 6 public class User { private int id; private String username; private int age; }
然后对数据进行过滤处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class StreamDemo { public static void main (String[] args) { User u1 = new User (11 , "a" , 23 ); User u2 = new User (12 , "b" , 24 ); User u3 = new User (13 , "c" , 22 ); User u4 = new User (14 , "d" , 28 ); User u5 = new User (16 , "e" , 26 ); List<User> list = Arrays.asList(u1, u2, u3, u4, u5); list.stream() .filter(user -> {return user.getId() % 2 == 0 ;}) .filter(user -> user.getAge() > 24 ) .map(user -> user.getUsername().toUpperCase()) .sorted((o1, o2) -> o2.compareTo(o1)) .limit(1 ) .forEach(System.out::println); } }
map()
似乎不好理解,可以通过一下代码理解:
1 2 3 4 5 6 7 8 9 10 11 public static void mapTest () { List<Integer> list1 = Arrays.asList(1 , 2 , 3 ); list1 = list1.stream().map(x -> { return x * 2 ; }).collect(Collectors.toList()); for (Integer element : list1) { System.out.println(element); } }
以上就是 Stream 流的基本操作,当然 Stream 流的 API 远不止这些,具体可以参考官方 API 或者其他博文。
Java 8 官方 API:Java™ Platform, Standard Edition 8
其他博文:Stream流的基本操作
那这 Stream 流有什么用呢?
当我们对数据库中的数据进行处理时,可以使用 Java 代码对原始数据进行处理得到我们想要的数据,从而化繁为简,使用 Java 的 API,而不是数据库内的 SQL 语句。
11. 分支合并
什么是分支合并
从 JDK1.7 开始,Java 提供 Fork / Join 框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最后汇总每个小任务的结果得到这个大任务的结果。这样的话在大数量的情况下可以提高效率。
这是一种分治的实现。
PS:如果熟悉归并排序相信不难理解,可以在本站搜索【归并排序】查看。
Fork/Join 框架与线程池的区别
1、Fork/Join 框架采用 “工作窃取” 模式 (work-stealing):当执行新的任务时它可以将其拆分成 更小的任务执行,并将小任务加到线程队列中,当没有任务执行时,再从一个随机线程的队列中偷一个并把它放在自己的队列中。
2、相对于一般的线程池实现 ,Fork / Join 框架的优势体现在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行那么该线程会处于等待状态。而在 Fork / Join 框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题(窃取过来)来执行,这种方式减少了线程的等待时间,提高了性能。
参考链接:11.ForkJoinPool 分支/合并框架 (工作窃取)
工作窃取
JDK 1.7 引入的 Fork / Join 框架就是基于工作窃取算法,可以使用 LinkedBlockingDeque
来实现工作窃取算法。
那么什么是工作窃取?
工作窃取算法是指某个线程从其他队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
当 A 线程开始执行的时候,优先总是处理本地队列中的任务,当它发现本地队列已经空了,那么它会去全局队列中获取 Task,当全局队列中也是空的,那么就会发生工作窃取(work-stealing)。任务调度器会把该线程池中额外的任务分配给 A 线程处理,其效果就好比该线程会才从其他线程的队列中“窃取”一个 Task 来执行。这样的目的是提高了 CPU 的使用效率。
这种策略是任务调度器的默认策略,通常是不需要改变的。如果需要改变,需要在创建任务时,设置任务的 TaskCreationOptions.PreferFairness
。
优点: 充分利用线程进行并行计算,减少线程间的竞争。
缺点: 在某些情况下还是会存在竞争,比如双端队列里只有一个任务时,并且该算法会消耗更多的系统资源, 比如创建多个线程和多个双端队列。
参考链接:
工作窃取算法 work-stealing
Task的运行原理和工作窃取(work stealing)
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ForkJoinDemo { public static void main (String[] args) throws ExecutionException, InterruptedException { MyTask task = new MyTask (0 , 100 ); ForkJoinPool threadPool = new ForkJoinPool (); ForkJoinTask<Integer> forkJoinTask = threadPool.submit(task); System.out.println(forkJoinTask.get()); threadPool.shutdown(); } } class MyTask extends RecursiveTask <Integer> { private static final Integer ADJUST_VALUE = 10 ; private int begin; private int end; private int result; public MyTask (int begin, int end) { this .begin = begin; this .end = end; } @Override protected Integer compute () { if ((end - begin) <= ADJUST_VALUE) { for (int i = begin; i <= end; i++) { result = result + i; } } else { int middle = (end + begin) / 2 ; MyTask task01 = new MyTask (begin, middle); MyTask task02 = new MyTask (middle + 1 , end); task01.fork(); task02.fork(); result = task01.join() + task02.join(); } return result; } }
12. 异步回调
12.1 概述与示例
概述
所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操作继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不需要等待计算结果,但调用者仍需要取线程的计算结果。
简单来说,我在厨房炒菜,但是没菜没油,楼下有超市,超市卖油不卖菜,可以在美团上买菜。最终,我选择上美团买油,在快递员送货的时间里,可以前往楼下超市买油。这就是一种异步,或者说 Future 模式。
在 Java 5 中引入了 Future
接口,虽然 Future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果。
或许 Doug Lea 大神也无法忍受了,便编写了 CompletableFuture
。 在 Java8 中,CompletableFuture
提供了非常强大的 Future
的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture
的方法。
简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class CompletableFutureDemo { public static void main (String[] args) throws Exception { CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { System.out.println(Thread.currentThread().getName() + " 没有返回,update ok" ); }); completableFuture.get(); CompletableFuture<Integer> integerCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() + " 有返回,insert ok" ); return 1024 ; }); Integer integer = integerCompletableFuture.whenComplete((t, h) -> { System.out.println("-------t= " + t); System.out.println("-------h= " + h); }).exceptionally(f -> { System.out.println("-------exception= " + f.getMessage()); return 444 ; }).get(); System.out.println(integer); } }
上述代码运行后:
将上述代码上的 int i = 10 / 0;
注释放开,模拟一个异常的产生:
12.2 API 的使用
创建方式
构建方法创建:
1 CompletableFuture<Object> future = new CompletableFuture <>();
这种方式创建的 CompletableFuture
没有放入任何任务,也没有添加任务返回结果。
可以调用 complete()
或 completeExceptionally()
方法,添加任务执行的返回值和抛出的异常信息。
如果要创建一个已经完成的任务,还可以通过提供的静态方法来创建:
1 2 3 public static <U> CompletableFuture<U> completedFuture (U value) { return new CompletableFuture <U>((value == null ) ? NIL : value); }
静态方法创建:
CompletableFuture
还提供了其他静态方法来创建:
1 2 3 4 5 public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier) ;public static <U> CompletableFuture<U> supplyAsync (Supplier<U> supplier, Executor executor) ;public static CompletableFuture<Void> runAsync (Runnable runnable) ;public static CompletableFuture<Void> runAsync (Runnable runnable, Executor executor) ;
两两一组,supplyAsync()
表示创建有返回值的异步任务,而 runAsync()
表示创建无返回值的异步任务, 这两个方法都存在重载形式,Executor
类型的参数表示可以指定执行异步任务时所用的线程池,否则使用提供的默认线程池。
使用静态方法创建 CompletableFuture
时,会立马开启线程执行异步任务。
获取任务执行结果
如果任务完成,就返回结果,否则一直阻塞直到任务完成:
1 public T get () throws InterruptedException, ExecutionException
如果任务完成,就返回结果,如果到了指定的超时时间还未完成,就抛出 TimeoutException
异常:
1 2 public T get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
如果任务完成,就返回结果,否则返回给定的 valueIfAbsent
值,不会产生阻塞:
1 public T getNow (T valueIfAbsent)
与 get()
方法类似,但不会抛出受检异常,如果任务执行过程中产生异常,则抛出非受检异常:
主动结束任务
如果当前任务已完成,返回 false
;如果当前任务没完成,主动触发当前任务的完成,返回 true
,其他线程获取到的任务执行结果是给定的 value
值:
1 public boolean complete (T value)
如果当前任务已完成,返回 false
;如果当前任务没完成,主动触发当前任务的完成,返回 true
,其他线程获取到的任务执行异常信息时给定的 ex
值:
1 public boolean completeExceptionally (Throwable ex)
任务完成后的下一步处理
1 2 3 public <U> CompletableFuture<U> thenApply (Function<? super T,? extends U> fn) public CompletableFuture<Void> thenRun (Runnable action) public CompletableFuture<Void> thenAccept (Consumer<? super T> action)
任务正常执行完成后,如果没有出现异常,则会回调上述方法。
thenApply()
:获取到上一个任务的执行结果,并返回处理结果;
thenRun()
:上一个任务正常执行完成后,执行 action
;
thenAccept()
:获取到上一个任务的执行结果,但不会返回处理结果。
除此之外,还要一个 thenCompose()
,获取上一个任务的执行结果,返回新的 CompletableFuture
,用于组合多个 CompletableFuture
,回调结果由第二个 CompletableFuture
返回:
1 2 public <U> CompletableFuture<U> thenCompose ( Function<? super T, ? extends CompletionStage<U>> fn)
thenApply()
与 thenCompose()
的区别:
thenApply()
相当于转换泛型类型,返回的是同一个 CompletableFuture
;
thenCompose()
则是使用上一个任务的执行结果,在下一个 CompletableFuture
中进行运算,返回新的 CompletableFuture
。
出现异常后的下一步处理
任务执行过程中,可能会出现异常,这时就会回调 exceptionally()
方法:
1 public CompletableFuture<T> exceptionally (Function<Throwable, ? extends T> fn)
它接收的 Function
类型的入参,是一个接收 Throwable
类型的参数并返回 T
类型结果的函数式接口,因此可以将任务执行过程中的异常吞掉。
当然,执行过程中如果没有发生异常,是不会回调该方法的。
执行结果与异常的“混合型”处理
前面的两种处理要么只能在任务完成后回调,要么只能在出现异常后回调,而下面的处理则是一种“混合型”,不论前一个任务是否执行成功还是失败,都会进行回调:
1 2 3 4 public <U> CompletableFuture<U> handle ( BiFunction<? super T, Throwable, ? extends U> fn) public CompletableFuture<T> whenComplete ( BiConsumer<? super T, ? super Throwable> action)
handle()
与 exceptionally()
类似,都能吞了异常,但 handle()
在上一个任务执行成功时也会被回调;
whenComplete()
并不影响上一个任务的返回值,主线程依旧能获取到上一个任务的返回值;与 exceptionally()
相比,它没法吞异常,也就是说主线程在获取执行过程中抛出异常的任务的返回值时,也会抛出异常。
集合关系
当某项处理需要依赖两个任务的执行结果时,可以使用 thenCombine()
,当依赖的两个任务都完成后,拿到它们的执行结果进行回调,并返回回调结果:
1 2 3 public <U,V> CompletableFuture<V> thenCombine ( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
也可以不返回回调结果:
1 2 3 public <U> CompletableFuture<Void> thenAcceptBoth ( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
甚至可以不需要依赖的两个任务的执行结果:
1 2 public CompletableFuture<Void> runAfterBoth (CompletionStage<?> other, Runnable action)
聚合关系
依赖的两个任务也不一定要都执行完,可以哪个执行得快就用哪个:
1 2 public <U> CompletableFuture<U> applyToEither ( CompletionStage<? extends T> other, Function<? super T, U> fn)
同样可以不返回回调结果:
1 2 public CompletableFuture<Void> acceptEither ( CompletionStage<? extends T> other, Consumer<? super T> action)
甚至不需要依赖的任务的执行结果:
1 2 public CompletableFuture<Void> runAfterEither (CompletionStage<?> other, Runnable action)
并行处理
CompletableFuture
还支持多个任务的依赖,allOf()
将在给定的所有任务都完成时,返回新的 CompletableFuture
:
1 public static CompletableFuture<Void> allOf (CompletableFuture<?>... cfs)
当然也可以是任何一个给定的任务完成时,返回新的 CompletableFuture
:
1 public static CompletableFuture<Object> anyOf (CompletableFuture<?>... cfs)
异步处理
在前文讲述的处理中,大多数都有两个同名但以 Async
结尾的方法。以 thenApply()
为例:
1 2 3 4 5 public <U> CompletionStage<U> thenApplyAsync (Function<? super T,? extends U> fn) public <U> CompletionStage<U> thenApplyAsync ( Function<? super T,? extends U> fn, Executor executor )
与 thenApply()
相比,thenApplyAsync()
会另开一个线程来执行下一阶段的处理,而 thenApply()
依旧使用执行上一个任务所用的线程。