CompletableFuture及反应式编程背后的概念
:::info
❏线程、Future以及推动Java支持更丰富的并发API的进化动力
❏ 异步API
❏ 从“线框与管道”的角度看并发计算
❏ 使用CompletableFuture结合器动态地连接线框❏ 构成Java 9反应式编程Flow API基础的“发布-订阅”协议❏ 反应式编程和反应式系统
:::
聚合模型
fork/join框架以及并行流都是非常有价值的并行处理工具。它们将一个任务切分为多个子任务,并将这些子任务分配到不同的核、CPU或者机器上去以并行的方式执行
处理并发而非并行任务,或者主要目标是在同一个CPU上执行多个松耦合的任务,要考虑的是在等待(很可能是很长的一段时间)远程服务的结果或者查询数据库时,尽可能地让这些核都忙起来,从而最大化应用的吞吐量,尽量避免线程阻塞和浪费宝贵的计算资源。
Java提供了两个主要的工具集。第一个就是Future接口,尤其是Java 8中提供的CompletableFuture,这通常是既简单又有效的解决方案。Java 9又新增了对反应式编程的支持,它基于Flow API实现了所谓的“发布-订阅”协议,使用它能提供更加细粒度的程序控制。
并发与并行这两种算法的差异。并发是一种编程属性(重叠地执行),即使在单核的机器上也可以执行,而并行是执行硬件的属性(同时执行)。
为支持并发而不断演进的Java
Java从一开始就提供了锁(通过synchronized类和方法)、Runnable以及线程。2004年,Java 5又引入了java.util.concurrent包,它能以更具表现力的方式支持并发,特别是ExecutorService[插图]接口(将“任务提交”与“线程执行”解耦)、Callable
Java 7为了使用fork/join实现分而治之算法,新增了java.util.concurrent.RecursiveTask, Java 8则增加了对流和流的并行处理(依赖于新增的Lambda表达式)的支持。
通过支持组合式的Future(基于Java 8 CompleteFuture实现的Future), Java进一步丰富了它的并发特性,Java 9提供了对分布式异步编程的显式支持。这些API为构建本章前面介绍的那种聚合型应用提供了思路和工具。在这种架构中,应用通过与各种网络服务通信,替用户实时整合需要的信息,或者将整合的信息作为进一步的网络服务提供出去。这种工作方式被称为反应式编程。Java 9通过“发布-订阅”协议(更具体地说,通过java.util.concurrent.Flow接口)增加了对它的支持。CompletableFuture及java.util.concurrent.Flow的关键理念是提供一种程序结构,让相互独立的任务尽可能地并发执行,通过这种方式最大化地利用多核或者多台机器提供的并发能力。
线程以及更高层的抽象
单CPU的计算机能支持多个用户,因为操作系统为每个用户创建了一个进程。操作系统为这些进程分配了相互独立的虚拟地址空间,这样每个用户都感觉他是在独占使用这台计算机。操作系统通过分时唤醒的方式让多个进程共享CPU资源,进一步地强化了这种假象。一个进程可以请求操作系统给它分配一个或多个线程——它们和主进程之间共享地址空间,因此可以并发地执行任务并相互协调。
在一个多核的环境中,单用户登录的笔记本电脑上可能只启动了一个用户进程,这种程序永远不能充分发挥计算机的处理能力,除非使用多线程。虽然每个核可以服务一个或多个进程或线程,但是如果程序并未使用多线程,那它同一时刻能有效使用的只有处理器众多核中的一个。
譬如有个四核CPU的机器,如果安排合理,让每个CPU核都持续不停地执行有效的任务,理论上程序的执行速度应该是单核CPU执行速度的四倍(当然,程序调度也会有开销,所以实际达不到这么多)。举个例子,假如有一个容量为1000000个数字大小的数组,其中保存了学生给出的正确答案的数目。下面的程序运行在单个线程上(该程序在单核年代运行得很顺畅):
long sum = 0;
for (int i = 0; i < 1_000_000; i++) {sum += stats[i];
}
//将上面的程序与使用四个线程的版本进行比较,其中第一个线程执行:
long sum0 = 0;
for (int i = 0; i < 250_000; i++) {sum0 += stats[i];
}
//第四个线程执行:
long sum3 = 0;
for (int i = 750_000; i < 1_000_000; i++) {sum3 += stats[i];
}
//这四个线程在main程序中通过Java的.start()方法启动,使用.join()等待其执行完成,最后执行计算:
sum = sum0 + ... + sum3;
//使用Java的Stream轻而易举地通过内部迭代而非外部迭代(显式的循环)实现并行:
sum = Arrays.stream(stats).parallel().sum();
并行流的迭代是比显式使用线程更高级的概念。换句话说,使用流(Stream)是对一种线程使用模式的抽象。将这种抽象引入流就像使用一种设计模式,带来的好处是程序员不再需要编写枯燥的模板代码了,库中的实现隐藏了代码大部分的复杂性。使用自Java 7才支持的java.util.concurrent.RecursiveTask,它对线程的fork/join进行了抽象,可以并发地执行分而治之算法,用一种更高级的方式在多核机器上高效地执行数组求和计算.
执行器和线程池
Java 5提供了执行器框架,其思想类似于一个高层的线程池,可以充分发挥线程的能力。执行器使得程序员有机会解耦任务的提交与任务的执行。
- 线程问题
操作系统(以及Java)的线程数都远远大于硬件线程数,因此即便一些操作系统线程被阻塞了,或者处于睡眠状态,所有的硬件线程还是会被完全占据,繁忙地执行着指令。举个例子,2016年英特尔公司生产的酷睿i7-6900K服务器处理器有八个核,每个核上有两个对称多处理(SMP)的硬件线程,这样算下来就有16个硬件线程。服务器上很可能有好多个这样的处理器,最终一台服务器上可能有64个硬件线程。与此相反,笔记本电脑可能就只有一个或者两个硬件线程,因此,移植程序时,不能想当然地假设可以使用多少个硬件线程。而某个程序中Java线程的最优数目往往依赖于硬件核的数目。
- 线程池的优势
Java的ExecutorService提供了一个接口,用户可以提交任务并获取它们的执行结果。期望的实现是使用newFixedThreadPool这样的工厂方法创建一个线程池
ExecutorService newFixedThreadPool(int nThreads)
这个方法会创建一个包含nThreads(通常称为工作线程)的ExecutorService,新创建的线程会被放入一个线程池,每次有新任务请求时,以先来先到的策略从线程池中选取未被使用的线程执行提交的任务请求。任务执行完毕之后,这些线程又会被归还给线程池。这种方式的最大优势在于能以很低的成本向线程池提交上千个任务,同时保证硬件匹配的任务执行。此外,你还有一些选项可以对ExecutorService进行配置,譬如队列长度、拒绝策略以及不同任务的优先级等。请注意这里使用的术语:程序员提供任务(它可以是一个Runnable或者Callable),由线程负责执行
- 线程池的不足
大多数情况下,使用线程池都比直接操纵线程要好,不过也需要特别留意使用线程池的两个陷阱
❏** 使用k个线程的线程池只能并发地执行k个任务**。提交的任务如果超过这个限制,线程池不会创建新线程去执行该任务,这些超限的任务会被加入等待队列,直到现有任务执行完毕才会重新调度空闲线程去执行新任务。通常情况下,这种工作模式运行得很好,它可以一次提交多个任务,而不必随机地创建大量的线程。
然而,采用这种方式时需要特别留意任务是否存在会进入睡眠、等待I/O结束或者等待网络连接的情况。一旦发生阻塞式I/O,这些任务占用了线程,却会由于等待无法执行有价值的工作。
- 假设你的CPU有4个硬件线程,创建的线程池大小为5,你一次性提交了20个执行任务。希望这些任务会并发地执行,直到所有20个任务执行完毕。
- 假设首批提交的线程中有3个线程进入了睡眠状态或者在等待I/O,那就只剩2个线程可以服务剩下的15个任务了。如此一来,只能取得你之前预期吞吐量的一半(如果创建的线程池中工作线程数为8,那么还是能取得同样预期吞吐量的)。如果早期提交的任务或者正在执行的任务需要等待后续任务,而这也正是Future典型的使用模式,那么可能会导致线程池死锁。
:::info
这里希望牢记的是,尽量避免向线程池提交可能阻塞(譬如睡眠,或者要等待某个事件)的任务,然而这一点在遗留系统中可能无法避免。
:::
❏ 通常情况下,Java从main返回之前,都会等待所有的线程执行完毕,从而避免误杀正在执行关键代码的线程。因此,实际操作时的一个好习惯是在退出程序执行之前,确保关闭每一个线程池(因为线程池中的工作线程在创建完后会由于要等待另一个任务执行完毕而无法正常终止)。实践中,经常使用一个长时间运行的ExecutorService管理需要持续运行的互联网服务。
其他的线程抽象:非嵌套方法调用
无论什么时候,任何任务(或者线程)在方法调用中启动时,都会在其返回之前调用同一个方法。换句话说,线程创建以及与其匹配的join()在调用返回的嵌套方法调用中都以嵌套的方式成对出现。这种思想被称为严格fork/join
一种更加松散的形式组织fork/join,这种方式下子任务从内部方法调用中逃逸出来,在外层调用中执行join,这样提供给用户的接口看起来还是一个普通调用
多种多样的并发形态,其中用户的方法调用创建的线程(或者派生的任务)可能比该调用方法的生命周期还长
这种类型的方法常常被称作异步方法,它的名字源于该方法所派生的任务会继续执行调用方法希望它完成的工作
采用这种方法会有哪些潜在的危害。
❏ 子线程与执行方法调用的代码会并发执行,因此为了避免出现数据竞争,编写代码时需要特别小心。
❏ 如果Java的main()方法在子线程终止之前返回,会发生什么情况?
有两种可能性,然而它们都不是我们期望的。
■ 等待所有的线程都执行完毕,再退出主应用的执行。
问题:可能由于等待一个一直无法顺利结束的线程,最终导致应用崩溃
■ 直接杀死所有无法正常终止的线程,然后退出程序的执行。
有可能中断一个写磁盘的I/O序列,导致外部数据出现不一致的现象
为了避免这些问题,需确保程序能有效地跟踪它创建的线程,且退出程序运行(包括线程池的关闭)之前必须加入这些线程。依据有没有执行setDaemon()方法,Java线程可以被划分为守护进程以及非守护进程。守护进程的线程在退出时就被终止(因此特别适合作为服务,因为它不会导致磁盘数据不一致),而从主程序返回的线程还得继续等待,直到所有非守护线程都终止了,应用才能退出执行。
同步及异步API
内部迭代(通过Stream提供的方法)替换外部迭代(显式的for循环)。接着,你可以使用parallel()方法对流进行处理,流中的元素会被Java运行时并发地处理,不再需要使用复杂的线程创建操作重写每一个循环
class ThreadExample {public static void main(String[] args) throws InterruptedException {int x = 1337;Result result = new Result();Thread t1 = new Thread(() -> { result.left = f(x); } );Thread t2 = new Thread(() -> { result.right = g(x); });t1.start();t2.start();t1.join();t2.join();System.out.println(result.left + result.right);}private static class Result {private int left;private int right;}}
使用Future API而不是Runnable进一步简化。之前已经建立了一个名为ExecutorService的线程池(譬如executorService)
public class ExecutorServiceExample {public static void main(String[] args)throws ExecutionException, InterruptedException {int x = 1337;ExecutorService executorService = Executors.newFixedThreadPool(2);Future<Integer> y = executorService.submit(() -> f(x));Future<Integer> z = executorService.submit(() -> g(x));System.out.println(y.get() + z.get());executorService.shutdown();}}
//显式调用submit时使用的模板代码的污染
API由同步API变为异步API。这种方式下,方法不再在物理返回其调用者的同时返回它的执行结果,被调用函数可以在返回结果就绪之前物理上提前返回调用函数
Future风格的API
Future<Integer> f(int x);Future<Integer> g(int x);
//改Future<Integer> y = f(x);Future<Integer> z = g(x);System.out.println(y.get() + z.get());
其思想是方法f会返回一个Future对象,该对象包含一个继续执行方法体中原始内容的任务,不过方法执行完f后会立刻返回,不会等待执行结果就绪。类似地,方法g也返回一个Future对象,第三行代码使用了一个get()方法等待这两个Future执行完毕,并计算它们的结果之和
保持方法g的API调用不变,仅在方法f中引入Future,并且不会降低其并发度。然而,对于大型的程序,建议你不要这样做,原因有两个。
❏ 其他使用函数g的地方可能也需要Future风格的版本,因此你最好使用统一的API风格。
❏ 为了充分发挥并行硬件的处理能力,以使程序运行得又快又好,将程序切分成更多粒度更细的任务是很有帮助的(当然也要控制在合理的范围之内)
反应式风格的API
核心思想是通过修改f和g的函数签名来使用回调风格的编程
void f(int x, IntConsumer dealWithResult);
//需要额外向f函数传递一个回调函数(其实是一个Lambda表达式)作为参数,
//f函数会在函数体中衍生一个任务,这个任务会在结果可用时使用它执行Lambda表达式,
//这样一来就不需要使用return返回值了。
//强调f函数衍生出执行函数体的任务后就立刻返回
public class CallbackStyleExample {public static void main(String[] args) int x = 1337;Result result = new Result();f(x, (int y) -> {result.left = y;System.out.println((result.left + result.right));} );g(x, (int z) -> {result.right = z;System.out.println((result.left + result.right));});}}
代码在打印输出正确结果(函数f和g调用之和)之前,打印输出的是最快拿到的值(偶尔还会打印输出两次计算的和,因为这段代码没有加锁,+的两个操作数既可以在打印输出执行之前更新,也可以在打印输出执行之后更新)。
解决这个问题有两种途径。
❏ 可以添加if-then-else判断,确定这两回调函数都已经执行完毕后再调用println打印输出和。为了达到这一目标,可能还需在恰当的位置添加锁。
❏ 还可以使用反应式风格的API。这种API主要适用于事件序列的处理,而非单一的结果。针对单一结果,采用Future可能更加合适。
:::info
反应式编程允许方法f和g多次调用它们的回调函数dealWithResult。而原始版本的f和g使用return返回结果,return只能被调用一次。Future与此类似,它也只能完成一次,执行Future的计算结果可以通过get()方法获取。某种程度上,反应式风格的异步API天然更适合于处理一系列的值(稍后会将它与流对比),而Future式的API更适合作为一次性处理的概念框架
:::
有害的睡眠及其他阻塞式操作
应用与用户或者其他应用交互时,往往需要限制事件发生的频率,一种很自然的方式是使用sleep()方法。
睡眠线程依旧会占用系统资源。如果睡眠的线程数目不多,一般没什么问题,有大量的线程处于睡眠状态需要解决
:::info
牢记的一点是,线程池中的任务即便是处于睡眠状态,也会阻塞其他任务的执行(它们无法停止已经分配了线程的任务,因为这些任务的调度是由操作系统管理的)。
:::
能阻塞线程池中可用线程执行的不仅仅只有睡眠。任何阻塞式操作都会产生同样的效果。
阻塞式操作可以分为两类:
一类是等待另一个任务执行,譬如调用Future的get()方法;
另一类是等待与外部交互的返回,譬如从网络、数据库服务器或者键盘这样的人机接口读取数据
更理想的方法是将你的任务切分成两部分——“之前”与“之后”——仅在程序执行未被阻塞时才由Java来调度“之后”部分的执行
假设以下两个任务都在线程池中执行。——A
它会被加入线程池的执行队列,之后开始执行。执行过程中,它被sleep调用阻塞,占用了工作线程10秒钟的时间,期间没有执行任何任务。接着它开始执行work2(),执行结束后释放工作线程
且在其睡眠期间占用了宝贵的线程时间——B
首先执行work1(),然后被终止——不过终止之前它会调度等待队列中的任务先执行work2() 10秒钟
调度执行了队列中的另一个任务(同时也消耗了几个字节的内存,然而并没有创建新的线程)
:::danger
创建任务时应该牢记于心的。任务在执行时会占用宝贵的系统资源,因此,你的目标是让它们持续地处于运行状态,直至其执行完毕,或者释放出使用的资源。任务在提交完后续任务后应该终止执行,而不是被阻塞
同样应用于I/O。任务启动读方法调用,或者读结束终止读方法调用,请求运行时库调度一个后续任务,都应该使用非阻塞操作,尽量不要使用传统的阻塞式读取
:::
线程数量是无限的,并且创建线程的开销可以忽略不计的话,那么代码A和代码B都是不错的解决方案。然而并非如此,只要你的任务中有线程可能进入睡眠状态,或者会被阻塞,这种情况下代码B无疑是更好的方案
验证
设计一个新系统
:::danger
能充分利用并行硬件的处理能力,那么把它设计成大量小型、并发的任务
同时以异步调用的方式实现所有可能阻塞的操作很可能是最理想的途径。
然而,这种“全异步”(everything asynchronous)的设计模式可能并不符合项目实际情况
Netty,它提供了用于创建网络服务器的统一的阻塞/非阻塞API
:::
如何使用异步API进行异常处理
无论是基于Future的异步API还是反应式异步API,被调方法的概念体(conceptual body)都在另一个线程中执行,调用方很可能已经退出了执行,不在调用异常处理器的作用域内。
很明显,这种非常规行为触发的异常需要通过其他的动作来处理
Future的CompletableFuture实现中包含的get()方法可以返回异常的信息,还可以通过像exceptionally()这样的方法进行异常恢复
对于反应式异步API,需要修改接口以引入额外的回调函数,这个回调函数会在触发异常时被调用,其方式就像不使用return返回,而是执行设定的回调函数一样。为了实现这种设计,需要在反应式API中使用多个回调函数
void f(int x, Consumer<Integer> dealWithResult,Consumer<Throwable> dealWithException);
//接着函数f的函数体可能会执行:
dealWithException(e);
如果有多个回调函数,将它们等价地封装成单一对象中的方法来传递一个对象而不是传递多个回调函数。
譬如,Java 9的Flow API就将多个回调函数封装成了一个对象(即Subscriber
void onComplete()
void onError(Throwable throwable)
void onNext(T item)
相互独立的回调函数代表了不同的含义,譬如值可以访问了(onNext)、获取值时发生了异常(onError),或者程序收到信号接下来没有新的数据(或者异常),此时就会调用onComplete函数
void f(int x, Subscriber<Integer> s);//f函数体现在借助执行下面的操作,以Throwable t的形式表示了一个异常
s.onError(t);
线框-管道——模型
设计和理解并发系统最好的方式是使用图形。
将这种技术称为线框-管道(box-and-channel)模型。
设想一个使用整型的简单场景,希望对之前计算f(x)+g(x)的例子做一个归纳。
现在想使用参数x调用方法(或函数)p,并将计算的结果作为参数传递给函数q1和q2,接着使用这两个调用的结果去调用方法(或函数)r,然后打印结果(为了避免混乱,这里不再区分类C的方法m以及它关联的函数C::m)。
//第一种方式是
/*
起来很清晰,不过Java会顺次执行对q1和q2的调用,而这是你希望避免的,
因为你的目标是要充分利用硬件的并行处理能力
*/
int t = p(x);
System.out.println( r(q1(t), q2(t)) );//另一种方法是使用Future并行地执行方法f和g:
/*
由于“线框-管道”图的形状,这个例子中并未用Future封装p和r。
p需要在其他所有任务之前完成,而r需要在其他所有任务之后执行
*/
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(), a2.get()));/**
这段代码中,我们需要用Future封装五个使用的函数(p、q1、q2、r和s)才能获得最大程度的并发
*/
System.out.println( r(q1(t), q2(t)) + s(x) );
:::info
这一方案在系统并发度不大的情况下工作得很好。但如果系统变得越来越大,
带有很多相互独立的“线框-管道”图,甚至有些线框内部还使用了自己的线框和管道会怎样呢?这种情况下,大量的任务(由于调用了get()方法)会处于等待Future结束的状态,导致最终无法充分发挥硬件的并发处理能力,甚至出现死锁。
此外,要深入理解这么大规模系统的结构才能确定多少任务容易由于执行get()处于等待状态,而这是非常困难的。Java 8的解决方案是使用结合器CompletableFuture
可以使用compose()和andThen()这样的方法将两个方法合成一个新的方法
:::
假设方法add1的功能是将l和一个整型数相加,而dble可以倍增一个整型数,那么你可以编写下面的代码,创建一个函数对它的参数执行倍增操作,并将计算结果与l求和返回
Function<Integer, Integer> myfun = add1.andThen(dble);
/*“线框-管道”图也可以直接使用结合器实现,效果同样不错。
可以借助Java的Function p、q1、q2以及BiFunction r简洁地表示
*/
p.thenBoth(q1, q2).thenCombine(r)
//无论是thenBoth还是thenCombine,其形式都不属于Java的Function或BiFunction类
:::info
线框-管道”模型可以帮助你梳理思路和代码。
某种程度上,提升了构建大型系统的抽象层次。
通过画线框(或者在程序中使用结合器)表达希望执行的计算,接着该线框被执行,这种方式比你直接手写计算任务可能高效不少。
结合器不仅适合数学计算,也适合Future和反应式数据流
:::
为并发而生的CompletableFuture和结合器
Future接口的一个问题是它是一个接口,需要思考如何设计并发代码结构才能采用Future实现任务。
历史上,除了FutureTask这一实现之外,Future也提供了其他几个动作:创建一个Future指定它执行某个计算任务,执行任务,等待执行终止,等等。新版Java提供了更多结构的支持(譬如RecursiveTask)。
Java 8为带来是对组合式Future的支持。使用Future接口的Completable-Future实现,可以创建组合式的Future
普通的Future通常是通过一个Callable创建的,执行完毕后,使用get()获得执行的结果。而CompletableFuture允许用户创建一个未指定运行任何代码的Future对象,之后由complete()方法指定其他的线程和值(这里是变量名)完成任务的执行,这样get()方法就能获得返回值
public class CFComplete {
@@public static void main(String[] args)throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337;
@@CompletableFuture<Integer> a = new CompletableFuture<>();executorService.submit(() -> a.complete(f(x)));int b = g(x);System.out.println(a.get() + b);executorService.shutdown();}}//OR public class CFComplete {public static void main(String[] args)throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337CompletableFuture<Integer> a = new CompletableFuture<>();executorService.submit(() -> b.complete(g(x)));int a = f(x);System.out.println(a + b.get())executorService.shutdown();}
}
这两种代码实现都会浪费处理资源,因为有线程执行get()调用而阻塞——前一段代码中的f(x)可能占用较长的时间,后一段代码中g(x)可能占用较长的时间。使用Java 8的CompletableFuture能解决这个问题
:::danger
两个活跃线程分别执行着f(x)和g(x),第一个线程执行完毕时,立刻启动一个新的线程返回计算的结果。
怎样才能编写任务,充分利用线程的处理能力
答案是,你可以让第一个任务执行f(x),第二个任务执行g(x),第三个任务(它既可以是一个新的线程,也可以复用现存线程中的一个)计算二者之和。
如果第三个任务在前两个任务结束之前不能开始执行。解决方案是使用Future的组合操作。
:::
组合操作是一种强大的程序构造思想,存在于多种语言之中。伴随着Java 8Lambda表达式的引入。组合思想的一个例子是在Stream上操作的组合,如下所示
myStream.map(...).filter(...).sum()
这一思想的另一个实例是你可以对两个函数使用compose()和andThen(),生成一个新的函数。使用CompletableFuture
//thenCombine的方法签名如下
//(为了避免被泛型和通配符搞晕,这里进行了一些简化
CompletableFuture<V> thenCombine(CompletableFuture<U> other,BiFunction<T, U, V> fn)/**
这个方法接受两个CompletableFuture值(返回结果类型分别是T和U),
并创建一个新值(返回结果类型为V)。前两个值执行结束时,
它取得其执行结果,并将结果传递给fn处理,
完成返回结果Future的构造,整个过程都没有阻塞发生
*/
public class CFCombine {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337;CompletableFuture<Integer> a = new CompletableFuture<>();CompletableFuture<Integer> b = new CompletableFuture<>();CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);executorService.submit(() -> a.complete(f(x)));executorService.submit(() -> b.complete(g(x)));System.out.println(c.get());executorService.shutdown();}
}
/**
thenCombine这一行代码非常关键:
它在完全不了解Future对象a和b要执行什么计算任务的前提下,
在线程池中创建了一个计划执行的新计算任务。
这个新的执行任务只在前两个执行任务完成之后才会被启动。
第三个执行任务c会对前两个执行任务的结果进行求和,
(最重要的是)
它直到前两个执行任务执行完毕之后才被授权可以在线程上执行,
而不是一开始就启动执行,然后阻塞等待前两个线程执行结束。
因此,这种设计实际不存在等待的操作,
而之前两个版本的代码都有阻塞等待的问题
**/
之前两个版本的代码都有阻塞等待的问题。前两个版本中,如果Future中的计算任务先完成的是第二个,那么线程池中的两个线程都会处于活动状态,即便你这时只需要一个线程执行计算任务
前两个版本中,计算y+x都在固定的线程中,要么是计算f(x)的线程,要么是计算g(x)的线程——二者之间都可能发生等待。与此相反,采用thenCombine调度求和计算,它只会在f(x)和g(x)都完成之后才进行。
对很多程序而言,并不关心少数的线程由于调用了get()方法会被阻塞,因此Java 8之前的Future依然是一种有价值的编程选择。
然而,如果需要处理大量的Future对象(譬如处理大量的服务请求),那么在这种情况下,避免由于调用get()产生的阻塞、并发性的损失甚至是死锁,使用CompletableFuture以及结合器通常是最佳的选择
发布-订阅 及 反应式编程
Future和CompletableFuture的思维模式是计算的执行是独立且并发的。使用get()方法可以在执行结束后获取Future对象的执行结果。因此,Future是一个一次性对象,它只能从头到尾执行代码一次。
:::danger
与此相反:反应式编程的思维模式是类Future的对象随着时间的推移可以产生很多的结果
:::
举例:
假设你要处理一个温度计对象。你的期望是,温度计对象会持续不断地生成结果,以每隔几秒的频率为你提供温度数据。另一个例子是Web服务器的监听组件对象。该组件监听来自网络的HTTP请求,并根据请求的内容返回相应的数据。接着,其他的代码会对结果数据进行处理,譬如温度或者HTTP请求中的数据。然后温度计和监听对象会继续检测温度、监听请求,直到有新的数据到来,周而复始。
这里有两点需要注意。核心的一点是,这些例子与Future非常像,然而它们可以完成(或产生)多次,而非一次性的操作。另一点是,第二个例子中,之前收到的结果与之后收到的结果可能都重要,而对温度计而言,大多数用户只关心最近的温度。但是,为什么要把这种编程叫作反应式的呢?答案是,程序的另一部分可能需要对低温报告做出反应,比如打开加热器
:::danger
反应式编程的模式更具表现力
一个Java流只能由一个终端操作使用
:::
流的编程模式让它很难表达一些类似流的操作,譬如将一个序列值进行切分,交由两个流水线(就像fork那样)来处理;或者处理和整合来自两个相互独立的流中的元素(就像join那样)。流支持的都是线性处理的流水线。
Java 9使用java.util.concurrent.Flow提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为pub-sub)。
三个主要概念:
❏ 订阅者可以订阅的发布者;
❏ 名为订阅的连接;
❏ 消息(也叫事件),它们通过连接传输。
订阅(subscription)就像是管道,发布者和订阅者类似于线框上的端口。
多个组件可以向同一个发布者订阅,
一个组件既可以发布多个相互独立的流,也可以向多个发布者订阅
示例两个流求和
“发布-订阅”的一个简单却典型的例子是整合两个信息源的事件并发布给其他用户使用。
概念上把它想象成一个电子表格。
假设电子表格中的一个单元格包含着公式。
我们对电子表格的单元格C3进行建模,该单元格包含了公式“=C1+C2”。只要C1或者C2被更新(无论是有人对它进行了更新,还是因为该表格包含了其他的公式), C3也会更新以反映这些变化。
假设下面的代码中,唯一可用的操作就是对单元格的值进行求和。
//对保存值的单元格进行建模
private class SimpleCell {private int value = 0;private String name;public SimpleCell(String name) {this.name = name;}
}//初始化几个单元格
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");/**
怎样才能指定当C1或C2的值发生变化时,
C3会对这两个值重新进行求和计算呢?
你需要一个途径让C3可以订阅C1和C2的事件。
为了达到这一目标,我们引入了接口Publisher<T>,
它的核心代码看起来像下面这样
**/
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
/**接口接受一个它可以通信的订阅者作为参数
Subscriber<T>接口提供了一个简单的方法onNext,
它接受信息作为参数
**/
interface Subscriber<T> {void onNext(T t);
}
如何融合?
单元格实际上既是一个发布者(它可以向其他单元格发布自己的事件)也是一个订阅者(需要依据其他单元格的事件进行响应)。Cell类的实现如下所示
Simplecell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3);c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20
ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20
c1.onNext(15); // 更新C1 的值为15//print
C1:10
C3:10
C2:20
C3:30
C1:15
C3:35/**当C1更新为15时,C3会立刻进行响应,也同步更新它的值。
发布者-订阅者交互的奇妙之处
在于你可以建立发布者与订阅者之间的一幅图
**/
创建另一个单元格C5,通过表达式“C5=C3+C4”,可以指定它依赖于C3和C4
//ArithmeticCell c5 = new ArithmeticCell("C5");
ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c4 = new SimpleCell("C4");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);c3.subscribe(c5::setLeft);
c4.subscribe(c5::setRight);//更新
c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20
c1.onNext(15); // 更新C1 的值为15
c4.onNext(1); // 更新C4 的值为1
c4.onNext(3); // 更新C4 的值为3//这些动作产生了下面的输出
C1:10
C3:10
C5:10
C2:20
C3:30
C5:30
C1:15
C3:35
C5:35
C4:1
C5:36
C4:3
C5:38//最终C5的值是38,因为C1是15, C2是20,而C4是3
:::danger
术语(发布—订阅 核心思想)
由于数据的流动是从发布者(生产者)流向订阅者(消费者),
因此程序员经常使用诸如向上流(upstream)和向下流(downstream)这样的术语。
前面的示例代码中,
向上流onNext()方法接收的数据newValue是由notifyAllSubscribers()方法传递给向下游的onNext()方法的。
:::
使用流进行编程时,希望发送信号给对象,而不是处理一个onNext事件,因此订阅者(监听者)需要定义onError和onComplete方法。这样一来,发布者才有机会告诉订阅者发生了异常,并终止数据流的发送(譬如,温度计的例子中,温度计可能被替换了,再也无法通过onNext方法返回更多的数据)。Java 9 Flow API中的Subscriber接口提供了对onError和onComplete方法的支持。这些方法是“发布-订阅”协议比传统的观察者模式更加强大的原因之一。
两个简单却重要的概念,即压力和背压,极大地丰富了Flow接口。对能否充分利用线程的处理能力影响很大
压力
还是以温度计为例,假设它之前以每隔几秒钟的频率返回温度数据,之后温度计进行了升级,能以更高的频率提供数据信息,譬如每毫秒报告一次温度数据。
- 程序能以足够快的速度响应这些事件么?
- 遭遇这种情况会不会发送缓冲区溢出,甚至是程序崩溃(回想一下我们之前碰到的问题场景,线程池一下涌入大量要处理的任务,同时又有一些任务被阻塞)?
- 类似地,假设向一个发布者订阅了服务,该服务会为提供SMS消息服务,将SMS消息推送到手机上。
- 这项订阅刚开始可能工作得很好,因为新手机上只有有限的几条SMS消息,几年之后,SMS消息的数量已经累积到数以千计的规模,这时候会发生什么情况呢?
- 这些消息可能在一秒钟内调用onNext发送完毕么?
这种情况通常被称作压力
假设有一个垂直的管道,其中装着标记了消息的球。你还需要一种“背压”机制,譬如限制多少球可以被加入圆筒。背压在Java 9的Flow API中是通过request()方法实现的。request()方法定义在一个新的接口Subscription中,该方法邀请发布者按照约定的数量发送下一次的元素,而不是以无限的速率发送元素(即采用拉模式,而不是推模式)。
背压
向Publisher传递一个Subscriber对象(它包含了onNext、onError和OnComplete方法),Publisher会在恰当的时候调用该对象。这个对象被用于在Publisher与Subscriber之间传递信息。通过背压(流量控制),可以限制信息传输的速率。
在此之前,需要通过Subscriber向Publisher发送相关的限制信息。还需要解决一个问题,一个Publisher可能有多个Subscriber。希望设置的背压只对点对点的连接生效,不影响其他的连接
//解决这个问题,Java 9 Flow API中的Subscriber接口提供了第4个方法
void onSubscribe (Subscription subscription);
当第一个事件通过Publisher与Subscriber之间的管道发送时,该方法就会被调用执行。Subscription对象包含的方法可以帮助Subscriber与Publisher进行通信
interface Subscription {void cancel ();void request (long n);
}/**
注意,回调函数经常有的“似乎后向兼容”效果。
Publisher创建了Subscription对象,并将其传递给Subscriber,
后者又可以调用它的方法由Subscriber向Publisher回传信息
**/
一种简单的真实背压
“发布-订阅”连接每次只处理一个事件
❏ 在Subscriber中本地存储由OnSubscribe方法传递的Subscription对象,为此,你可能需要为其添加一个subscription字段。
❏ 让onSubscribe、onNext和onError(有可能也需要)的最后一个动作都是使用channel.request(1)请求下一个事件(注意只请求一个事件,避免Subscriber被太多的事件淹没)。
❏ 修改Publisher,让本例中的notifyAllSubscribers方法只对提交了请求的管道发送onNext或者onError事件。
❏ 通常,Publisher会创建一个新的Subscription对象,并将其与Subscriber一一对应,这样才能确保多个Subscriber可以按照自己设定的背压处理数据。
流程看似简单,但实现背压时还需要额外考虑一系列的取舍
❏ 你是否要以最低速度向多个Subscriber发送事件?或者你是否要为每个Subscriber维护一个单独的未发送数据队列?
❏ 如果这些队列增长过快,会发生什么情况?
❏ 如果Subscriber还未准备好接收数据,你会丢弃事件么?
“基于拉模式的反应式背压”这一概念。之所以称其为“基于拉模式的反应式”,是因为它为Subscriber提供了一种途径,借助于事件(反应式)去“拉取”(通过request方法)Publisher提供的更多信息。其结果就是背压机制。
反应式系统和反应式编程
反应式系统是一个程序,其架构很灵活,可以在运行时调整以适应变化的需求。三大特性可以概括为响应性、韧性和弹性。
响应性意味着反应式系统不能因为正在替某人处理一个大型任务就延迟其他用户的查询请求,它必须实时地对输入进行响应。韧性意味着系统不能因为某个组件失效就无法提供服务。某个网络连接出现问题,不应该影响其他网络的查询服务,对无法响应组件的查询应该被重新路由到备用组件上。弹性意味着系统可以调整以适应工作负荷的变化,持续高效地运行。
就像可以在酒吧中动态调整提供食物和提供酒水服务的员工,让两个队列的等待时间都保持一致,同样,也可以调整各种软件服务的工作线程数,避免工作线程处于闲等状态,以使每个队列都能高效地处理。
主流的方式是使用由Java的java.util. concurrent.Flow接口提供的反应式编程。这些接口的设计反映了反应式宣言的第4个,也是最后一个属性,即消息驱动。消息驱动的系统基于线框-管道模型提供了内部API,组件等待要处理的输入,处理结果通过消息发送给其他的组件,以这种方式创建了一个反应式系统。
小结
❏ Java对并发的支持由来已久,并且还在持续演进。通常而言,线程池技术很有帮助,然而如果你有大量可能阻塞的任务,使用它反而会带来麻烦。
❏ 方法异步化(在完成它们的工作之前返回)能提升程序的并发度,其可以与用于循环结构的优化进行互补。
❏ 使用线框-管道模型可以对异步系统进行可视化。
❏ Java 8的CompletableFuture类和Java 9的Flow API都可以通过线框-管道图表示。
❏ CompletableFuture类常用于一次性的异步计算。使用结合器可以组合多个异步计算,并且无须担心使用Future时的阻塞风险。
❏ Flow API基于“发布-订阅”协议,它与背压一起构成了Java反应式编程的基础。❏ 反应式编程可以帮助实现反应式系统。
CompletableFuture:组合式异步编程
Future接口
Java 5引入了Future接口,它的设计初衷是对将来某个时刻会发生的结果进行建模。举个例子,调用方发起远程服务查询时,它是无法立刻得到查询结果的。采用Future接口可以对异步计算进行建模,返回一个指向执行结果的引用,运算结束后,调用方可以通过该引用访问执行的结果。在Future中触发那些可能耗时的调用,能够将调用线程解放出来,让它们继续执行其他有价值的工作,不必呆呆等待耗时的操作完成。
Future的另一大优点是它比更底层的Thread更好用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就好了。
这种编程方式让线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。
接着,如果已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞线程,直到操作完成,返回相应的结果。
虽然Future提供了一个无须任何参数的get方法,推荐使用重载版本的get方法,它接受一个超时的参数,通过它,可以定义线程等待Future结果的最长时间,而不是永无止境地等待下去
Future接口的局限性
Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以编写简洁的并发代码。
难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。
描述能力特性
❏ 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
❏ 等待Future集合中的所有任务都完成。
❏ 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
❏ 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
❏ 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
CompletableFuture类(它实现了Future接口)如何利用Java 8的新特性以更直观的方式将上述需求都变为可能。Stream和CompletableFuture的设计都遵循了类似的模式:都使用了Lambda表达式以及流水线的思想。从这个角度,可以说CompletableFuture和Future的关系就跟Stream和Collection的关系一样。
使用CompletableFuture构建异步应用
”最佳价格查询器”(best-price-finder)的应用,查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。
❏ 首先,你会学到如何为你的客户提供异步API(如果你拥有一间在线商店的话,这是非常有帮助的)。
❏ 其次,掌握如何使用了同步API的代码变为非阻塞代码。会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
❏ 会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户就可能遭遇白屏)。
:::danger
同步API与异步API
同步API 其实只是对传统方法调用的另一种称呼:
调用了某个方法,调用方在被调用方执行的过程中会等待,被调用方执行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用名字的由来。
与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交由另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。
执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种风格的计算在I/O系统程序设计中很常见:发起了一次磁盘访问,如果同时还有很多其他计算任务,那这次访问与其他计算任务会异步执行,完成其他任务没有别的事情做时,会等待磁盘块载入内存。
注意,阻塞和非阻塞通常用于描述操作系统的某种I/O实现。
然而,这些术语也常常等价地用在非I/O的上下文中,即“异步调用”和“同步调用”。
:::
实现异步API
public class Shop {public double getPrice(String product) {// 待实现}
}
/**该方法的内部实现会查询商店的数据库,
但也有可能执行一些别的耗时的任务,
比如联系其他外部服务
(比如,商店的供应商,或者跟制造商相关的推广折扣)
**/
//模拟1秒钟延迟的方法
public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
/**getPrice方法会调用delay方法,并返回一个随机计算的值
取巧
使用charAt,依据产品的名称,生成一个随机值作为价格
**///在getPrice方法中引入一个模拟的延迟
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。
将同步方法转换为异步方法
将getPrice转换为getPriceAsync方法,并修改它的返回值
public Future<Double> getPriceAsync(String product) { ... }
Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。
意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。
因为这样的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。
新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法
:::danger
创建了一个代表异步计算的CompletableFuture对象实例,它在计算完成时会包含计算的结果。
接着,调用fork创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个Future实例。
当请求的产品价格最终计算得出时,可以使用它的complete方法,结束CompletableFuture对象的运行
:::
显然,这个新版Future的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用
客户向商店查询了某种商品的价格。由于商店提供了异步API,该次调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格。
这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,而不会呆呆地阻塞在那里等待第一家商店返回请求的结果。
最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,就再调用Future的get方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问
Invocation returned after 43 msecs
Price is 123.26
Price returned after 1045 msecs
getPriceAsync方法的调用返回远远早于最终价格计算完成的时间
避免发生客户端被阻塞的风险:Future执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数
错误处理
如果价格计算过程中产生了错误会怎样呢?
用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。
解决:
- 客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。
- 代码中添加超时判断的逻辑,避免发生类似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException
让客户端能了解商店无法提供请求商品价格的原因,需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出
客户端现在会收到一个ExecutionException异常,该异常接受了一个包含失败原因的Exception参数,即价格计算方法最初抛出的异常。
如果该方法抛出了一个运行时异常“product isn't available”,客户端就会得到像下面这样一段ExecutionException:
Exception in thread "main" java.lang.RuntimeException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException:
product not available
at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:16)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
product not available
at java.base/java.util.concurrent.CompletableFuture.reportGet
(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get
(CompletableFuture.java:1999)
at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:14)
Caused by: java.lang.RuntimeException: product not available
at java89inaction.chap16.AsyncShop.calculatePrice(AsyncShop.java:38)
at java89inaction.chap16.AsyncShop.lambda$0(AsyncShop.java:33)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1700)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec
(CompletableFuture.java:1692)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
at java.base/java.util.concurrent.ForkJoinPool.runWorker
(ForkJoinPool.java:1603)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run
(ForkJoinWorkerThread.java:175)
使用工厂方法supplyAsync创建CompletableFuture
CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节
//使用工厂方法supplyAsync创建CompletableFuture对象
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。
生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。
向CompletableFuture的工厂方法传递可选参数,指定生产者方法的执行线程是可行的
:::info
getPriceAsync方法返回的CompletableFuture对象
与
手工创建和完成的CompletableFuture对象是完全等价的
这意味着它提供了同样的错误管理机制,而前者花费了大量的精力才得以构建。
:::
让代码免受阻塞之苦
List<Shop> shops = List.of(new Shop("BestPrice"),new Shop("LetsSaveBig"),new Shop("MyFavoriteShop"),new Shop("BuyItAll"));
使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称和该商店中指定商品的价格
public List<String> findPrices(String product);
顺序查询所有商店的方式实现的findPrices方法
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
.collect(toList());
}
验证findPrices的正确性和执行性能
long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");
//输出
/**
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
is 214.13, BuyItAll price is 184.74]
Done in 4032 msecs
**/
findPrices方法的执行时间仅比四秒钟多了那么几毫秒,因为对这四个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒左右的时间计算请求商品的价格
使用并行流对请求进行并行操作
新版findPrices的改进
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
is 214.13, BuyItAll price is 184.74]
Done in 1180 msecs
现在对4个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1秒多一点儿。尝试使用刚学过的Completable-Future,将findPrices方法中对不同商店的同步调用替换为异步调用。
使用CompletableFuture发起异步请求
使用工厂方法supplyAsync创建CompletableFuture对象
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))
.collect(toList());
得到一个List<CompletableFuture
由于用CompletableFuture实现的findPrices方法要求返回一个List
向最初的List<CompletableFuture
注意CompletableFuture类中的join方法和Future接口中的get方法有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。
这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。
考虑流操作之间的延迟特性,如果在单一流水线中处理流,那么发向不同商家的请求只能以同步、顺序执行的方式才会成功。
因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作,通知join方法返回计算结果
上半部分展示了使用单一流水线处理流的过程,执行的流程(以虚线标识)是顺序的。
事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。
与此相反,下半部分展示了如何先将CompletableFuture对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。
:::info
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
Done in 2005 msecs
:::
超过两秒意味着利用CompletableFuture实现的版本,原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,不尽人意
寻找更好的方案
并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果想要增加第5个商家到商店列表中,让“最佳价格查询”应用对其进行处理,那这时会发生什么情况?毫不意外,顺序执行版本的执行还是需要大约5秒多钟的时间
并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态)的四个线程现在都处于繁忙状态,都在对前四个商店进行查询。第5个查询只能等到前面某一个操作完成以释放出空闲线程才能继续
CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。
试图让代码处理九个商店,那么并行流版本耗时3143毫秒,而CompletableFuture版本耗时3009毫秒。
究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。
使用定制的执行器
明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算
$ N_{thread}=N_{CPU} * U_{CPU} * (1+W/C) $
❏ NCPU是处理器的核的数目,可以通过Runtime.getRuntime().available Processors()得到;
❏ UCPU是期望的CPU利用率(该值应该介于0和1之间);
❏ W/C是等待时间与计算时间的比率。
应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。意味着如果你期望的CPU利用率是100%,那么你需要创建一个拥有400个线程的线程池。
实际操作中,如果创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,线程池中的有些线程根本没有机会被使用。
出于这种考虑,建议将执行器使用的线程数,与需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。
不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,还是需要设置一个上限,比如100个线程。
现在正创建的是一个由守护线程构成的线程池。当一个普通线程在执行时,Java程序无法终止或者退出,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。
与此相反,如果将线程标记为守护进程,则意味着程序退出时它也会被回收。这二者之间没有性能上的差异。
可以将执行器作为第二个参数传递给supplyAsync工厂方法
//创建一个可查询指定商品价格的CompletableFuture对象
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +shop.getPrice(product), executor);
使用CompletableFuture方案的程序处理五个商店仅耗时1021毫秒,处理九个商店时耗时1022毫秒。
一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值400。
这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFuture向其提交任务执行是个不错的主意。
处理需大量使用异步操作的情况时,这几乎是最有效的策略。
并行——使用流还是CompletableFuture?
对集合进行并行计算有两种方式:
- 要么将其转化为并行流,利用map这样的操作开展工作,
- 要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。
后者提供了更多的灵活性,可以调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。
建议
❏ 如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
❏ 反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。
对多个异步任务进行流水线操作
假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。使用一个枚举型变量Discount.Code来实现这一想法
//以枚举类型定义的折扣代码
public class Discount {public enum Code {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}// Discount类的具体实现这里暂且不表示,参见代码清单16-14
}
假设所有的商店都同意修改getPrice方法的返回格式。getPrice现在以ShopName:price:DiscountCode的格式返回一个String类型的值。我们的示例实现中会返回一个随机生成的Discount.Code,以及已经计算得出的随机价格
public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
调用getPrice方法可能会返回像下面这样一个String值:调用getPrice方法可能会返回像下面这样一个String值
实现折扣服务
“最佳价格查询器”应用现在能从不同的商店取得商品价格,解析结果字符串,针对每个字符串,查询折扣服务器的折扣代码。
流程决定了请求商品的最终折扣价格(每个折扣代码的实际折扣比率有可能发生变化,所以每次都需要查询折扣服务)。
//对商店返回字符串的解析操作封装到了下面的Quote类之中
public class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code code) {this.shopName = shopName;this.price = price;this.discountCode = code;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}public String getShopName() { return shopName; }public double getPrice() { return price; }public Discount.Code getDiscountCode() { return discountCode; }
}
传递shop对象返回的字符串给静态工厂方法parse,你可以得到Quote类的一个实例,它包含了shop的名称、折扣之前的价格,以及折扣代码。
Discount服务还提供了一个applyDiscount方法,它接受一个Quote对象,返回一个字符串,表示生成该Quote的shop中的折扣价格
使用Discount服务
由于Discount服务是一种远程服务,因此你还需要增加1秒钟的模拟延迟,首先尝试以最直接的方式(坏消息是,这种方式是顺序而且同步执行的)重新实现findPrices,以满足这些新增的需求。
在shop构成的流上采用流水线方式执行三次map操作,我们得到了期望的结果。
❏ 第一个操作将每个shop对象转换成了一个字符串,该字符串包含了该shop中指定商品的价格和折扣代码。
❏ 第二个操作对这些字符串进行了解析,在Quote对象中对它们进行转换。
❏ 最终,第三个map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回包含该价格及提供该价格商品的shop的字符串。
这种实现方式的性能远非最优
:::info
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
Done in 10028 msecs
:::
耗时10s。顺序查询五个商店耗时大约5秒,现在又加上了Discount服务为五个商店返回的价格申请折扣所消耗的5秒钟
- 已知流转换为并行流的方式,非常容易提升该程序的性能
- 也知这一方案在商店的数目增加时,扩展性不好,因为Stream底层依赖的是线程数量固定的通用线程池
- 也知道,通过自定义CompletableFuture调度任务执行的执行器能够更充分地利用CPU资源。
构造同步和异步操作
使用CompletableFuture提供的特性,以异步方式重新实现findPrices方法
只需要将Lambda表达式作为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。
获取
第一个转换的结果是一个Stream<CompletableFuture
对CompletableFuture进行了设置对CompletableFuture进行了设置
解析价格
第二次转换将字符串转变为订单。因为一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。
由于这个原因,对第一步中生成的CompletableFuture对象调用它的thenApply,将一个由字符串转换Quote的方法作为参数传递给它。
直到你调用的CompletableFuture执行结束,使用的thenApply方法都不会阻塞你代码的执行。
这意味着CompletableFuture最终结束运行时,你希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture对象。
你可以把这看成是为处理CompletableFuture的结果建立了一个菜单,就像曾经为Stream的流水线所做的事儿一样。
为计算折扣价格构造Future
第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。
这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,需要模拟远程调用带来的延迟),出于这一原因,也希望它能够异步执行。
实现这一目标,像第一个调用传递getPrice给supplyAsync那样,将这一操作以Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个CompletableFuture对象。
到目前为止,已经进行了两次异步操作,用了两个不同的CompletableFuture对象进行建模,希望能把它们以级联的方式串接起来进行工作。
- ❏ 从shop对象中获取价格,接着把价格转换为Quote。
- ❏ 拿到返回的Quote对象,将其作为参数传递给Discount服务,取得最终的折扣价格。
Java 8的CompletableFutureAPI提供了名为thenCompose的方法
它就是专门为这一目的而设计的,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。
换句话说,你可以创建两个CompletableFuture对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。
当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。
使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。
将这三次map操作返回的Stream元素收集到一个列表,你就得到了一个List<CompletableFuture
thenCompose方法像CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。
通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行,而名称以Async结尾的方法会将后续任务提交到一个线程池,所以每个任务是由不同的线程处理的。
就这个例子而言,第二个CompletableFuture对象的结果取决于第一个CompletableFuture,
所以无论使用哪个版本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少差别。
我们选择thenCompose方法的原因是因为它更高效一点,因为它少了很多线程切换的开销。
注意,即便如此,也很难搞清楚到底使用的是哪一个线程,尤其是应用还使用了自己的线程池(譬如Spring),那就更加困难了。
将两个CompletableFuture对象整合起来,无论它们是否存在依赖
对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。
但是,另一种比较常见的情况是,需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且也不希望等到第一个任务完全结束才开始第二个任务。
应该使用thenCombine方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供了一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,那么由另一个任务以异步的方式执行。
示例
一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。
可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。
当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。
用这种方式,需要使用第三个CompletableFuture对象,当前两个CompletableFuture计算出结果,并由BiFunction方法完成合并后,由它来最终结束这一任务
整合的操作只是简单的乘法操作,用另一个单独的任务对其进行操作有些浪费资源,所以只要使用thenCombine方法,无须特别求助于异步版本的thenCombineAsync方法。
创建的多个任务是如何在线程池中选择不同的线程执行的,以及最终的运行结果又是如何整合的。
对Future和CompletableFuture的回顾
CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。
通过向执行器提交一个Callable对象的方式创建了第一个Future对象,向外部服务查询欧元和美元之间的转换汇率。紧接着,创建了第二个Future对象,查询指定商店中特定商品的欧元价格。
在同一个Future中通过查询商店得到的欧元商品价格乘以汇率得到了最终的价格
使用thenCombineAsync,不使用thenCombine,采用第三个Future单独进行商品价格和汇率的乘法运算,效果是几乎相同的。这两种实现看起来没太大区别,原因是你只对两个Future进行了合并。
高效地使用超时机制
读取采用Future计算结果值时,为了避免线程等待结果返回导致的永久阻塞,设定一个超时机制是个不错的主意。Java 9通过CompletableFuture提供了多个方法,可以更加灵活地设置线程的超时机制。
orTimeout在指定的超时到达时,会通过Scheduled-ThreadExecutor线程结束该CompletableFuture对象,并抛出一个TimeoutException异常,它的返回值是一个新的CompletableFuture对象。
凭借这一方法,可以将计算流水线串接起来,发生TimeoutException异常时,反馈一个友好的消息给用户。
有时,如果服务偶然性地无法及时响应,临时使用默认值继续执行也是一种可接受的解决方案。
期望汇率服务1秒钟之内就能返回欧元到美元的兑换汇率。不过,即便请求耗时更长,也不希望程序直接抛出一个异常,让之前的计算开销付之东流。
这种情况下,希望程序可以退化为使用预先定义的汇率。通过Java 9新引入的completeOnTimeout方法,可以轻松地完成这一任务,为程序添加第二种超时机制,
同orTimeout方法一样,completeOnTimeOut方法也返回一个CompletableFuture,可以将它与其他的CompletableFuture方法链接起来。
简短地回顾一下,目前我们已经能配置两种类型的超时:一种是如果程序执行超时,譬如超过3秒,整个计算都会失败;另一种是如果程序执行超时,譬如超过1秒,还可以使用预定义的默认值继续执行,不会发生失效。
CompletableFuture自身执行完毕之前,调用它的get或者join方法,执行都会被阻塞。、
响应CompletableFuture的completion事件
//一个模拟生成0.5秒至2.5秒随机延迟的方法
private static final Random random = new Random();
public static void randomDelay() {int delay = 500 + random.nextInt(2000);try {Thread.sleep(delay);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。
避免问题
等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理CompletableFuture流,这样每个CompletableFuture都在为某个商店执行必要的操作
//重构findPrices方法返回一个由Future构成的流
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}
为findPricesStream方法返回的Stream添加了第4个map操作,在此之前,已经在该方法内部调用了三次map。
这个新添加的操作其实很简单,只是在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的返回值。
Java 8的CompletableFutureAPI通过thenAccept方法提供了这一功能,它接受CompletableFuture执行完毕后的返回值做参数。
在这里的例子中,该值是由Discount服务返回的字符串值,它包含了提供请求商品的商店名称及折扣价格
findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
和之前看到的thenCompose和thenCombine方法一样,thenAccept方法也提供了一个异步版本,名为thenAcceptAsync。
thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture
因此,map操作返回的是一个Stream<CompletableFuture
还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,把构成Stream的所有CompletableFuture
//响应CompletableFuture的completion事件
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
allOf工厂方法接受一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture
意味着,如果你需要等待最初Stream中的所有CompletableFuture对象执行完毕,那么对allOf方法返回的CompletableFuture执行join操作是个不错的主意。
另一些场景中,你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,使用一个类似的工厂方法anyOf。该方法接受一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture