5、提升Java的并发性

news/2025/1/15 12:38:04/文章来源:https://www.cnblogs.com/lehoso/p/18672782

CompletableFuture及反应式编程背后的概念

:::info
❏线程、Future以及推动Java支持更丰富的并发API的进化动力

❏ 异步API

❏ 从“线框与管道”的角度看并发计算

❏ 使用CompletableFuture结合器动态地连接线框❏ 构成Java 9反应式编程Flow API基础的“发布-订阅”协议❏ 反应式编程和反应式系统

:::

聚合模型

fork/join框架以及并行流都是非常有价值的并行处理工具。它们将一个任务切分为多个子任务,并将这些子任务分配到不同的核、CPU或者机器上去以并行的方式执行

处理并发而非并行任务,或者主要目标是在同一个CPU上执行多个松耦合的任务,要考虑的是在等待(很可能是很长的一段时间)远程服务的结果或者查询数据库时,尽可能地让这些核都忙起来,从而最大化应用的吞吐量,尽量避免线程阻塞和浪费宝贵的计算资源。

Java提供了两个主要的工具集。第一个就是Future接口,尤其是Java 8中提供的CompletableFuture,这通常是既简单又有效的解决方案。Java 9又新增了对反应式编程的支持,它基于Flow API实现了所谓的“发布-订阅”协议,使用它能提供更加细粒度的程序控制。

并发与并行这两种算法的差异。并发是一种编程属性(重叠地执行),即使在单核的机器上也可以执行,而并行是执行硬件的属性(同时执行)。

为支持并发而不断演进的Java

Java从一开始就提供了锁(通过synchronized类和方法)、Runnable以及线程。2004年,Java 5又引入了java.util.concurrent包,它能以更具表现力的方式支持并发,特别是ExecutorService[插图]接口(将“任务提交”与“线程执行”解耦)、Callable以及Future,后两者使用泛型(也是从Java 5首次引入)生成一个高层封装的Runnable或Thread变体,可以返回执行结果。ExecutorService既可以执行Runnable也可以执行Callable。这些新特性促进了多核CPU上并行编程的发展。

Java 7为了使用fork/join实现分而治之算法,新增了java.util.concurrent.RecursiveTask, Java 8则增加了对流和流的并行处理(依赖于新增的Lambda表达式)的支持。

通过支持组合式的Future(基于Java 8 CompleteFuture实现的Future), Java进一步丰富了它的并发特性,Java 9提供了对分布式异步编程的显式支持。这些API为构建本章前面介绍的那种聚合型应用提供了思路和工具。在这种架构中,应用通过与各种网络服务通信,替用户实时整合需要的信息,或者将整合的信息作为进一步的网络服务提供出去。这种工作方式被称为反应式编程。Java 9通过“发布-订阅”协议(更具体地说,通过java.util.concurrent.Flow接口)增加了对它的支持。CompletableFuture及java.util.concurrent.Flow的关键理念是提供一种程序结构,让相互独立的任务尽可能地并发执行,通过这种方式最大化地利用多核或者多台机器提供的并发能力。

线程以及更高层的抽象

单CPU的计算机能支持多个用户,因为操作系统为每个用户创建了一个进程。操作系统为这些进程分配了相互独立的虚拟地址空间,这样每个用户都感觉他是在独占使用这台计算机。操作系统通过分时唤醒的方式让多个进程共享CPU资源,进一步地强化了这种假象。一个进程可以请求操作系统给它分配一个或多个线程——它们和主进程之间共享地址空间,因此可以并发地执行任务并相互协调。

在一个多核的环境中,单用户登录的笔记本电脑上可能只启动了一个用户进程,这种程序永远不能充分发挥计算机的处理能力,除非使用多线程。虽然每个核可以服务一个或多个进程或线程,但是如果程序并未使用多线程,那它同一时刻能有效使用的只有处理器众多核中的一个。

譬如有个四核CPU的机器,如果安排合理,让每个CPU核都持续不停地执行有效的任务,理论上程序的执行速度应该是单核CPU执行速度的四倍(当然,程序调度也会有开销,所以实际达不到这么多)。举个例子,假如有一个容量为1000000个数字大小的数组,其中保存了学生给出的正确答案的数目。下面的程序运行在单个线程上(该程序在单核年代运行得很顺畅):

long sum = 0;
for (int i = 0; i < 1_000_000; i++) {sum += stats[i];
}
//将上面的程序与使用四个线程的版本进行比较,其中第一个线程执行:
long sum0 = 0;
for (int i = 0; i < 250_000; i++) {sum0 += stats[i];
}
//第四个线程执行:
long sum3 = 0;
for (int i = 750_000; i < 1_000_000; i++) {sum3 += stats[i];
}
//这四个线程在main程序中通过Java的.start()方法启动,使用.join()等待其执行完成,最后执行计算:
sum = sum0 + ... + sum3;
//使用Java的Stream轻而易举地通过内部迭代而非外部迭代(显式的循环)实现并行:
sum = Arrays.stream(stats).parallel().sum();

并行流的迭代是比显式使用线程更高级的概念。换句话说,使用流(Stream)是对一种线程使用模式的抽象。将这种抽象引入流就像使用一种设计模式,带来的好处是程序员不再需要编写枯燥的模板代码了,库中的实现隐藏了代码大部分的复杂性。使用自Java 7才支持的java.util.concurrent.RecursiveTask,它对线程的fork/join进行了抽象,可以并发地执行分而治之算法,用一种更高级的方式在多核机器上高效地执行数组求和计算.

执行器和线程池

Java 5提供了执行器框架,其思想类似于一个高层的线程池,可以充分发挥线程的能力。执行器使得程序员有机会解耦任务的提交与任务的执行。

  1. 线程问题

操作系统(以及Java)的线程数都远远大于硬件线程数,因此即便一些操作系统线程被阻塞了,或者处于睡眠状态,所有的硬件线程还是会被完全占据,繁忙地执行着指令。举个例子,2016年英特尔公司生产的酷睿i7-6900K服务器处理器有八个核,每个核上有两个对称多处理(SMP)的硬件线程,这样算下来就有16个硬件线程。服务器上很可能有好多个这样的处理器,最终一台服务器上可能有64个硬件线程。与此相反,笔记本电脑可能就只有一个或者两个硬件线程,因此,移植程序时,不能想当然地假设可以使用多少个硬件线程。而某个程序中Java线程的最优数目往往依赖于硬件核的数目。

  1. 线程池的优势

Java的ExecutorService提供了一个接口,用户可以提交任务并获取它们的执行结果。期望的实现是使用newFixedThreadPool这样的工厂方法创建一个线程池

ExecutorService newFixedThreadPool(int nThreads)

这个方法会创建一个包含nThreads(通常称为工作线程)的ExecutorService,新创建的线程会被放入一个线程池,每次有新任务请求时,以先来先到的策略从线程池中选取未被使用的线程执行提交的任务请求。任务执行完毕之后,这些线程又会被归还给线程池。这种方式的最大优势在于能以很低的成本向线程池提交上千个任务,同时保证硬件匹配的任务执行。此外,你还有一些选项可以对ExecutorService进行配置,譬如队列长度、拒绝策略以及不同任务的优先级等。请注意这里使用的术语:程序员提供任务(它可以是一个Runnable或者Callable),由线程负责执行

  1. 线程池的不足

大多数情况下,使用线程池都比直接操纵线程要好,不过也需要特别留意使用线程池的两个陷阱

❏** 使用k个线程的线程池只能并发地执行k个任务**。提交的任务如果超过这个限制,线程池不会创建新线程去执行该任务,这些超限的任务会被加入等待队列,直到现有任务执行完毕才会重新调度空闲线程去执行新任务。通常情况下,这种工作模式运行得很好,它可以一次提交多个任务,而不必随机地创建大量的线程。

然而,采用这种方式时需要特别留意任务是否存在会进入睡眠、等待I/O结束或者等待网络连接的情况。一旦发生阻塞式I/O,这些任务占用了线程,却会由于等待无法执行有价值的工作。

  1. 假设你的CPU有4个硬件线程,创建的线程池大小为5,你一次性提交了20个执行任务。希望这些任务会并发地执行,直到所有20个任务执行完毕。
  2. 假设首批提交的线程中有3个线程进入了睡眠状态或者在等待I/O,那就只剩2个线程可以服务剩下的15个任务了。如此一来,只能取得你之前预期吞吐量的一半(如果创建的线程池中工作线程数为8,那么还是能取得同样预期吞吐量的)。如果早期提交的任务或者正在执行的任务需要等待后续任务,而这也正是Future典型的使用模式,那么可能会导致线程池死锁。

:::info
这里希望牢记的是,尽量避免向线程池提交可能阻塞(譬如睡眠,或者要等待某个事件)的任务,然而这一点在遗留系统中可能无法避免。

:::

❏ 通常情况下,Java从main返回之前,都会等待所有的线程执行完毕,从而避免误杀正在执行关键代码的线程。因此,实际操作时的一个好习惯是在退出程序执行之前,确保关闭每一个线程池(因为线程池中的工作线程在创建完后会由于要等待另一个任务执行完毕而无法正常终止)。实践中,经常使用一个长时间运行的ExecutorService管理需要持续运行的互联网服务。

其他的线程抽象:非嵌套方法调用

无论什么时候,任何任务(或者线程)在方法调用中启动时,都会在其返回之前调用同一个方法。换句话说,线程创建以及与其匹配的join()在调用返回的嵌套方法调用中都以嵌套的方式成对出现。这种思想被称为严格fork/join

一种更加松散的形式组织fork/join,这种方式下子任务从内部方法调用中逃逸出来,在外层调用中执行join,这样提供给用户的接口看起来还是一个普通调用

多种多样的并发形态,其中用户的方法调用创建的线程(或者派生的任务)可能比该调用方法的生命周期还长

这种类型的方法常常被称作异步方法,它的名字源于该方法所派生的任务会继续执行调用方法希望它完成的工作

采用这种方法会有哪些潜在的危害。

❏ 子线程与执行方法调用的代码会并发执行,因此为了避免出现数据竞争,编写代码时需要特别小心。

❏ 如果Java的main()方法在子线程终止之前返回,会发生什么情况?

有两种可能性,然而它们都不是我们期望的。

■ 等待所有的线程都执行完毕,再退出主应用的执行。

问题:可能由于等待一个一直无法顺利结束的线程,最终导致应用崩溃

■ 直接杀死所有无法正常终止的线程,然后退出程序的执行。

有可能中断一个写磁盘的I/O序列,导致外部数据出现不一致的现象

为了避免这些问题,需确保程序能有效地跟踪它创建的线程,且退出程序运行(包括线程池的关闭)之前必须加入这些线程。依据有没有执行setDaemon()方法,Java线程可以被划分为守护进程以及非守护进程。守护进程的线程在退出时就被终止(因此特别适合作为服务,因为它不会导致磁盘数据不一致),而从主程序返回的线程还得继续等待,直到所有非守护线程都终止了,应用才能退出执行。

同步及异步API

内部迭代(通过Stream提供的方法)替换外部迭代(显式的for循环)。接着,你可以使用parallel()方法对流进行处理,流中的元素会被Java运行时并发地处理,不再需要使用复杂的线程创建操作重写每一个循环

        class ThreadExample {public static void main(String[] args) throws InterruptedException {int x = 1337;Result result = new Result();Thread t1 = new Thread(() -> { result.left = f(x); } );Thread t2 = new Thread(() -> { result.right = g(x); });t1.start();t2.start();t1.join();t2.join();System.out.println(result.left + result.right);}private static class Result {private int left;private int right;}}

使用Future API而不是Runnable进一步简化。之前已经建立了一个名为ExecutorService的线程池(譬如executorService)

        public class ExecutorServiceExample {public static void main(String[] args)throws ExecutionException, InterruptedException {int x = 1337;ExecutorService executorService = Executors.newFixedThreadPool(2);Future<Integer> y = executorService.submit(() -> f(x));Future<Integer> z = executorService.submit(() -> g(x));System.out.println(y.get() + z.get());executorService.shutdown();}}
//显式调用submit时使用的模板代码的污染

API由同步API变为异步API。这种方式下,方法不再在物理返回其调用者的同时返回它的执行结果,被调用函数可以在返回结果就绪之前物理上提前返回调用函数

Future风格的API

        Future<Integer> f(int x);Future<Integer> g(int x);
//改Future<Integer> y = f(x);Future<Integer> z = g(x);System.out.println(y.get() + z.get());

其思想是方法f会返回一个Future对象,该对象包含一个继续执行方法体中原始内容的任务,不过方法执行完f后会立刻返回,不会等待执行结果就绪。类似地,方法g也返回一个Future对象,第三行代码使用了一个get()方法等待这两个Future执行完毕,并计算它们的结果之和

保持方法g的API调用不变,仅在方法f中引入Future,并且不会降低其并发度。然而,对于大型的程序,建议你不要这样做,原因有两个。

❏ 其他使用函数g的地方可能也需要Future风格的版本,因此你最好使用统一的API风格。

❏ 为了充分发挥并行硬件的处理能力,以使程序运行得又快又好,将程序切分成更多粒度更细的任务是很有帮助的(当然也要控制在合理的范围之内)

反应式风格的API

核心思想是通过修改f和g的函数签名来使用回调风格的编程

void f(int x, IntConsumer dealWithResult);
//需要额外向f函数传递一个回调函数(其实是一个Lambda表达式)作为参数,
//f函数会在函数体中衍生一个任务,这个任务会在结果可用时使用它执行Lambda表达式,
//这样一来就不需要使用return返回值了。
//强调f函数衍生出执行函数体的任务后就立刻返回
public class CallbackStyleExample {public static void main(String[] args) int x = 1337;Result result = new Result();f(x, (int y) -> {result.left = y;System.out.println((result.left + result.right));} );g(x, (int z) -> {result.right = z;System.out.println((result.left + result.right));});}}

代码在打印输出正确结果(函数f和g调用之和)之前,打印输出的是最快拿到的值(偶尔还会打印输出两次计算的和,因为这段代码没有加锁,+的两个操作数既可以在打印输出执行之前更新,也可以在打印输出执行之后更新)。

解决这个问题有两种途径。

❏ 可以添加if-then-else判断,确定这两回调函数都已经执行完毕后再调用println打印输出和。为了达到这一目标,可能还需在恰当的位置添加锁。

❏ 还可以使用反应式风格的API。这种API主要适用于事件序列的处理,而非单一的结果。针对单一结果,采用Future可能更加合适。

:::info
反应式编程允许方法f和g多次调用它们的回调函数dealWithResult。而原始版本的f和g使用return返回结果,return只能被调用一次。Future与此类似,它也只能完成一次,执行Future的计算结果可以通过get()方法获取。某种程度上,反应式风格的异步API天然更适合于处理一系列的值(稍后会将它与流对比),而Future式的API更适合作为一次性处理的概念框架

:::

有害的睡眠及其他阻塞式操作

应用与用户或者其他应用交互时,往往需要限制事件发生的频率,一种很自然的方式是使用sleep()方法。

睡眠线程依旧会占用系统资源。如果睡眠的线程数目不多,一般没什么问题,有大量的线程处于睡眠状态需要解决

:::info
牢记的一点是,线程池中的任务即便是处于睡眠状态,也会阻塞其他任务的执行(它们无法停止已经分配了线程的任务,因为这些任务的调度是由操作系统管理的)。

:::

能阻塞线程池中可用线程执行的不仅仅只有睡眠。任何阻塞式操作都会产生同样的效果。

阻塞式操作可以分为两类:

一类是等待另一个任务执行,譬如调用Future的get()方法;

另一类是等待与外部交互的返回,譬如从网络、数据库服务器或者键盘这样的人机接口读取数据

更理想的方法是将你的任务切分成两部分——“之前”与“之后”——仅在程序执行未被阻塞时才由Java来调度“之后”部分的执行

假设以下两个任务都在线程池中执行。——A

它会被加入线程池的执行队列,之后开始执行。执行过程中,它被sleep调用阻塞,占用了工作线程10秒钟的时间,期间没有执行任何任务。接着它开始执行work2(),执行结束后释放工作线程

且在其睡眠期间占用了宝贵的线程时间——B

首先执行work1(),然后被终止——不过终止之前它会调度等待队列中的任务先执行work2() 10秒钟

调度执行了队列中的另一个任务(同时也消耗了几个字节的内存,然而并没有创建新的线程)

:::danger
创建任务时应该牢记于心的。任务在执行时会占用宝贵的系统资源,因此,你的目标是让它们持续地处于运行状态,直至其执行完毕,或者释放出使用的资源。任务在提交完后续任务后应该终止执行,而不是被阻塞


同样应用于I/O。任务启动读方法调用,或者读结束终止读方法调用,请求运行时库调度一个后续任务,都应该使用非阻塞操作,尽量不要使用传统的阻塞式读取

:::

线程数量是无限的,并且创建线程的开销可以忽略不计的话,那么代码A和代码B都是不错的解决方案。然而并非如此,只要你的任务中有线程可能进入睡眠状态,或者会被阻塞,这种情况下代码B无疑是更好的方案

验证

设计一个新系统

:::danger
能充分利用并行硬件的处理能力,那么把它设计成大量小型、并发的任务

同时以异步调用的方式实现所有可能阻塞的操作很可能是最理想的途径。

然而,这种“全异步”(everything asynchronous)的设计模式可能并不符合项目实际情况

Netty,它提供了用于创建网络服务器的统一的阻塞/非阻塞API

:::

如何使用异步API进行异常处理

无论是基于Future的异步API还是反应式异步API,被调方法的概念体(conceptual body)都在另一个线程中执行,调用方很可能已经退出了执行,不在调用异常处理器的作用域内。

很明显,这种非常规行为触发的异常需要通过其他的动作来处理

Future的CompletableFuture实现中包含的get()方法可以返回异常的信息,还可以通过像exceptionally()这样的方法进行异常恢复

对于反应式异步API,需要修改接口以引入额外的回调函数,这个回调函数会在触发异常时被调用,其方式就像不使用return返回,而是执行设定的回调函数一样。为了实现这种设计,需要在反应式API中使用多个回调函数

void f(int x, Consumer<Integer> dealWithResult,Consumer<Throwable> dealWithException);
//接着函数f的函数体可能会执行:
dealWithException(e);

如果有多个回调函数,将它们等价地封装成单一对象中的方法来传递一个对象而不是传递多个回调函数。

譬如,Java 9的Flow API就将多个回调函数封装成了一个对象(即Subscriber类,它包含了四个回调函数形式的方法

void     onComplete()
void     onError(Throwable throwable)
void     onNext(T item)

相互独立的回调函数代表了不同的含义,譬如值可以访问了(onNext)、获取值时发生了异常(onError),或者程序收到信号接下来没有新的数据(或者异常),此时就会调用onComplete函数

void f(int x, Subscriber<Integer> s);//f函数体现在借助执行下面的操作,以Throwable t的形式表示了一个异常
s.onError(t);

线框-管道——模型

设计和理解并发系统最好的方式是使用图形。

将这种技术称为线框-管道(box-and-channel)模型。

设想一个使用整型的简单场景,希望对之前计算f(x)+g(x)的例子做一个归纳。

现在想使用参数x调用方法(或函数)p,并将计算的结果作为参数传递给函数q1和q2,接着使用这两个调用的结果去调用方法(或函数)r,然后打印结果(为了避免混乱,这里不再区分类C的方法m以及它关联的函数C::m)。

//第一种方式是
/*
起来很清晰,不过Java会顺次执行对q1和q2的调用,而这是你希望避免的,
因为你的目标是要充分利用硬件的并行处理能力
*/
int t = p(x);
System.out.println( r(q1(t), q2(t)) );//另一种方法是使用Future并行地执行方法f和g:
/*
由于“线框-管道”图的形状,这个例子中并未用Future封装p和r。
p需要在其他所有任务之前完成,而r需要在其他所有任务之后执行
*/
int t = p(x);
Future<Integer> a1 = executorService.submit(() -> q1(t));
Future<Integer> a2 = executorService.submit(() -> q2(t));
System.out.println( r(a1.get(), a2.get()));/**
这段代码中,我们需要用Future封装五个使用的函数(p、q1、q2、r和s)才能获得最大程度的并发
*/
System.out.println( r(q1(t), q2(t)) + s(x) );

:::info
这一方案在系统并发度不大的情况下工作得很好。但如果系统变得越来越大,

带有很多相互独立的“线框-管道”图,甚至有些线框内部还使用了自己的线框和管道会怎样呢?这种情况下,大量的任务(由于调用了get()方法)会处于等待Future结束的状态,导致最终无法充分发挥硬件的并发处理能力,甚至出现死锁。

此外,要深入理解这么大规模系统的结构才能确定多少任务容易由于执行get()处于等待状态,而这是非常困难的。Java 8的解决方案是使用结合器CompletableFuture

可以使用compose()和andThen()这样的方法将两个方法合成一个新的方法

:::

假设方法add1的功能是将l和一个整型数相加,而dble可以倍增一个整型数,那么你可以编写下面的代码,创建一个函数对它的参数执行倍增操作,并将计算结果与l求和返回

Function<Integer, Integer> myfun = add1.andThen(dble);
/*“线框-管道”图也可以直接使用结合器实现,效果同样不错。
可以借助Java的Function p、q1、q2以及BiFunction r简洁地表示
*/
p.thenBoth(q1, q2).thenCombine(r)
//无论是thenBoth还是thenCombine,其形式都不属于Java的Function或BiFunction类

:::info
线框-管道”模型可以帮助你梳理思路和代码。

某种程度上,提升了构建大型系统的抽象层次。

通过画线框(或者在程序中使用结合器)表达希望执行的计算,接着该线框被执行,这种方式比你直接手写计算任务可能高效不少。

结合器不仅适合数学计算,也适合Future和反应式数据流

:::

为并发而生的CompletableFuture和结合器

Future接口的一个问题是它是一个接口,需要思考如何设计并发代码结构才能采用Future实现任务。

历史上,除了FutureTask这一实现之外,Future也提供了其他几个动作:创建一个Future指定它执行某个计算任务,执行任务,等待执行终止,等等。新版Java提供了更多结构的支持(譬如RecursiveTask)。

Java 8为带来是对组合式Future的支持。使用Future接口的Completable-Future实现,可以创建组合式的Future

普通的Future通常是通过一个Callable创建的,执行完毕后,使用get()获得执行的结果。而CompletableFuture允许用户创建一个未指定运行任何代码的Future对象,之后由complete()方法指定其他的线程和值(这里是变量名)完成任务的执行,这样get()方法就能获得返回值

        public class CFComplete {
@@public static void main(String[] args)throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337;
@@CompletableFuture<Integer> a = new CompletableFuture<>();executorService.submit(() -> a.complete(f(x)));int b = g(x);System.out.println(a.get() + b);executorService.shutdown();}}//OR public class CFComplete {public static void main(String[] args)throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337CompletableFuture<Integer> a = new CompletableFuture<>();executorService.submit(() -> b.complete(g(x)));int a = f(x);System.out.println(a + b.get())executorService.shutdown();}
}

这两种代码实现都会浪费处理资源,因为有线程执行get()调用而阻塞——前一段代码中的f(x)可能占用较长的时间,后一段代码中g(x)可能占用较长的时间。使用Java 8的CompletableFuture能解决这个问题

:::danger
两个活跃线程分别执行着f(x)和g(x),第一个线程执行完毕时,立刻启动一个新的线程返回计算的结果。

怎样才能编写任务,充分利用线程的处理能力

答案是,你可以让第一个任务执行f(x),第二个任务执行g(x),第三个任务(它既可以是一个新的线程,也可以复用现存线程中的一个)计算二者之和。

如果第三个任务在前两个任务结束之前不能开始执行。解决方案是使用Future的组合操作。

:::

组合操作是一种强大的程序构造思想,存在于多种语言之中。伴随着Java 8Lambda表达式的引入。组合思想的一个例子是在Stream上操作的组合,如下所示

myStream.map(...).filter(...).sum()

这一思想的另一个实例是你可以对两个函数使用compose()和andThen(),生成一个新的函数。使用CompletableFuture的thenCombine方法对你的两个计算结果求和显然更好

//thenCombine的方法签名如下
//(为了避免被泛型和通配符搞晕,这里进行了一些简化
CompletableFuture<V> thenCombine(CompletableFuture<U> other,BiFunction<T, U, V> fn)/**
这个方法接受两个CompletableFuture值(返回结果类型分别是T和U)​,
并创建一个新值(返回结果类型为V)​。前两个值执行结束时,
它取得其执行结果,并将结果传递给fn处理,
完成返回结果Future的构造,整个过程都没有阻塞发生
*/
public class CFCombine {public static void main(String[] args) throws ExecutionException,InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(10);int x = 1337;CompletableFuture<Integer> a = new CompletableFuture<>();CompletableFuture<Integer> b = new CompletableFuture<>();CompletableFuture<Integer> c = a.thenCombine(b, (y, z)-> y + z);executorService.submit(() -> a.complete(f(x)));executorService.submit(() -> b.complete(g(x)));System.out.println(c.get());executorService.shutdown();}
}
/**
thenCombine这一行代码非常关键:
它在完全不了解Future对象a和b要执行什么计算任务的前提下,
在线程池中创建了一个计划执行的新计算任务。
这个新的执行任务只在前两个执行任务完成之后才会被启动。
第三个执行任务c会对前两个执行任务的结果进行求和,
​(最重要的是)
它直到前两个执行任务执行完毕之后才被授权可以在线程上执行,
而不是一开始就启动执行,然后阻塞等待前两个线程执行结束。
因此,这种设计实际不存在等待的操作,
而之前两个版本的代码都有阻塞等待的问题
**/

之前两个版本的代码都有阻塞等待的问题。前两个版本中,如果Future中的计算任务先完成的是第二个,那么线程池中的两个线程都会处于活动状态,即便你这时只需要一个线程执行计算任务

f(x)、g(x)以及结果求和这三个计算的时间序列图

前两个版本中,计算y+x都在固定的线程中,要么是计算f(x)的线程,要么是计算g(x)的线程——二者之间都可能发生等待。与此相反,采用thenCombine调度求和计算,它只会在f(x)和g(x)都完成之后才进行。

对很多程序而言,并不关心少数的线程由于调用了get()方法会被阻塞,因此Java 8之前的Future依然是一种有价值的编程选择。

然而,如果需要处理大量的Future对象(譬如处理大量的服务请求),那么在这种情况下,避免由于调用get()产生的阻塞、并发性的损失甚至是死锁,使用CompletableFuture以及结合器通常是最佳的选择

发布-订阅 及 反应式编程

Future和CompletableFuture的思维模式是计算的执行是独立且并发的。使用get()方法可以在执行结束后获取Future对象的执行结果。因此,Future是一个一次性对象,它只能从头到尾执行代码一次。

:::danger
与此相反:反应式编程的思维模式是类Future的对象随着时间的推移可以产生很多的结果

:::

举例:

假设你要处理一个温度计对象。你的期望是,温度计对象会持续不断地生成结果,以每隔几秒的频率为你提供温度数据。另一个例子是Web服务器的监听组件对象。该组件监听来自网络的HTTP请求,并根据请求的内容返回相应的数据。接着,其他的代码会对结果数据进行处理,譬如温度或者HTTP请求中的数据。然后温度计和监听对象会继续检测温度、监听请求,直到有新的数据到来,周而复始。

这里有两点需要注意。核心的一点是,这些例子与Future非常像,然而它们可以完成(或产生)多次,而非一次性的操作。另一点是,第二个例子中,之前收到的结果与之后收到的结果可能都重要,而对温度计而言,大多数用户只关心最近的温度。但是,为什么要把这种编程叫作反应式的呢?答案是,程序的另一部分可能需要对低温报告做出反应,比如打开加热器

:::danger
反应式编程的模式更具表现力

一个Java流只能由一个终端操作使用

:::

流的编程模式让它很难表达一些类似流的操作,譬如将一个序列值进行切分,交由两个流水线(就像fork那样)来处理;或者处理和整合来自两个相互独立的流中的元素(就像join那样)。流支持的都是线性处理的流水线。

Java 9使用java.util.concurrent.Flow提供的接口对反应式编程进行建模,实现了名为“发布-订阅”的模型(也叫协议,简写为pub-sub)。

三个主要概念:

订阅者可以订阅的发布者

❏ 名为订阅的连接;

消息(也叫事件),它们通过连接传输。

订阅(subscription)就像是管道,发布者和订阅者类似于线框上的端口。

多个组件可以向同一个发布者订阅,

一个组件既可以发布多个相互独立的流,也可以向多个发布者订阅

示例两个流求和

“发布-订阅”的一个简单却典型的例子是整合两个信息源的事件并发布给其他用户使用。

概念上把它想象成一个电子表格。

假设电子表格中的一个单元格包含着公式。

我们对电子表格的单元格C3进行建模,该单元格包含了公式“=C1+C2”。只要C1或者C2被更新(无论是有人对它进行了更新,还是因为该表格包含了其他的公式), C3也会更新以反映这些变化。

假设下面的代码中,唯一可用的操作就是对单元格的值进行求和。

//对保存值的单元格进行建模
private class SimpleCell {private int value = 0;private String name;public SimpleCell(String name) {this.name = name;}
}//初始化几个单元格
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");/**
怎样才能指定当C1或C2的值发生变化时,
C3会对这两个值重新进行求和计算呢?
你需要一个途径让C3可以订阅C1和C2的事件。
为了达到这一目标,我们引入了接口Publisher<T>,
它的核心代码看起来像下面这样
**/
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
/**接口接受一个它可以通信的订阅者作为参数
Subscriber<T>接口提供了一个简单的方法onNext,
它接受信息作为参数
**/
interface Subscriber<T> {void onNext(T t);
}

如何融合?

单元格实际上既是一个发布者(它可以向其他单元格发布自己的事件)也是一个订阅者(需要依据其他单元格的事件进行响应)。Cell类的实现如下所示

Simplecell c3 = new SimpleCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3);c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20

ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20
c1.onNext(15); // 更新C1 的值为15//print
C1:10
C3:10
C2:20
C3:30
C1:15
C3:35/**当C1更新为15时,C3会立刻进行响应,也同步更新它的值。
发布者-订阅者交互的奇妙之处
在于你可以建立发布者与订阅者之间的一幅图
**/

创建另一个单元格C5,通过表达式“C5=C3+C4”,可以指定它依赖于C3和C4

//ArithmeticCell c5 = new ArithmeticCell("C5");
ArithmeticCell c3 = new ArithmeticCell("C3");
SimpleCell c4 = new SimpleCell("C4");
SimpleCell c2 = new SimpleCell("C2");
SimpleCell c1 = new SimpleCell("C1");c1.subscribe(c3::setLeft);
c2.subscribe(c3::setRight);c3.subscribe(c5::setLeft);
c4.subscribe(c5::setRight);//更新
c1.onNext(10); // 更新C1 的值为10
c2.onNext(20); // 更新C2 的值为20
c1.onNext(15); // 更新C1 的值为15
c4.onNext(1); // 更新C4 的值为1
c4.onNext(3); // 更新C4 的值为3//这些动作产生了下面的输出
C1:10
C3:10
C5:10
C2:20
C3:30
C5:30
C1:15
C3:35
C5:35
C4:1
C5:36
C4:3
C5:38//最终C5的值是38,因为C1是15, C2是20,而C4是3

:::danger
术语(发布—订阅 核心思想

由于数据的流动是从发布者(生产者)流向订阅者(消费者),

因此程序员经常使用诸如向上流(upstream)和向下流(downstream)这样的术语。

前面的示例代码中,

向上流onNext()方法接收的数据newValue是由notifyAllSubscribers()方法传递给向下游的onNext()方法的。

:::

使用流进行编程时,希望发送信号给对象,而不是处理一个onNext事件,因此订阅者(监听者)需要定义onError和onComplete方法。这样一来,发布者才有机会告诉订阅者发生了异常,并终止数据流的发送(譬如,温度计的例子中,温度计可能被替换了,再也无法通过onNext方法返回更多的数据)。Java 9 Flow API中的Subscriber接口提供了对onError和onComplete方法的支持。这些方法是“发布-订阅”协议比传统的观察者模式更加强大的原因之一。

两个简单却重要的概念,即压力背压,极大地丰富了Flow接口。对能否充分利用线程的处理能力影响很大

压力

还是以温度计为例,假设它之前以每隔几秒钟的频率返回温度数据,之后温度计进行了升级,能以更高的频率提供数据信息,譬如每毫秒报告一次温度数据。

  • 程序能以足够快的速度响应这些事件么?
  • 遭遇这种情况会不会发送缓冲区溢出,甚至是程序崩溃(回想一下我们之前碰到的问题场景,线程池一下涌入大量要处理的任务,同时又有一些任务被阻塞)?
  • 类似地,假设向一个发布者订阅了服务,该服务会为提供SMS消息服务,将SMS消息推送到手机上。
  • 这项订阅刚开始可能工作得很好,因为新手机上只有有限的几条SMS消息,几年之后,SMS消息的数量已经累积到数以千计的规模,这时候会发生什么情况呢?
  • 这些消息可能在一秒钟内调用onNext发送完毕么?

这种情况通常被称作压力


假设有一个垂直的管道,其中装着标记了消息的球。你还需要一种“背压”机制,譬如限制多少球可以被加入圆筒。背压在Java 9的Flow API中是通过request()方法实现的。request()方法定义在一个新的接口Subscription中,该方法邀请发布者按照约定的数量发送下一次的元素,而不是以无限的速率发送元素(即采用拉模式,而不是推模式)。

背压

向Publisher传递一个Subscriber对象(它包含了onNext、onError和OnComplete方法),Publisher会在恰当的时候调用该对象。这个对象被用于在Publisher与Subscriber之间传递信息。通过背压(流量控制),可以限制信息传输的速率。

在此之前,需要通过Subscriber向Publisher发送相关的限制信息。还需要解决一个问题,一个Publisher可能有多个Subscriber。希望设置的背压只对点对点的连接生效,不影响其他的连接

//解决这个问题,Java 9 Flow API中的Subscriber接口提供了第4个方法
void onSubscribe (Subscription subscription);

当第一个事件通过Publisher与Subscriber之间的管道发送时,该方法就会被调用执行。Subscription对象包含的方法可以帮助Subscriber与Publisher进行通信

interface Subscription {void     cancel ();void     request (long n);
}/**
注意,回调函数经常有的“似乎后向兼容”效果。
Publisher创建了Subscription对象,并将其传递给Subscriber,
后者又可以调用它的方法由Subscriber向Publisher回传信息
**/

一种简单的真实背压

“发布-订阅”连接每次只处理一个事件

❏ 在Subscriber中本地存储由OnSubscribe方法传递的Subscription对象,为此,你可能需要为其添加一个subscription字段。

❏ 让onSubscribe、onNext和onError(有可能也需要)的最后一个动作都是使用channel.request(1)请求下一个事件(注意只请求一个事件,避免Subscriber被太多的事件淹没)。

❏ 修改Publisher,让本例中的notifyAllSubscribers方法只对提交了请求的管道发送onNext或者onError事件。

❏ 通常,Publisher会创建一个新的Subscription对象,并将其与Subscriber一一对应,这样才能确保多个Subscriber可以按照自己设定的背压处理数据。

流程看似简单,但实现背压时还需要额外考虑一系列的取舍

❏ 你是否要以最低速度向多个Subscriber发送事件?或者你是否要为每个Subscriber维护一个单独的未发送数据队列?

❏ 如果这些队列增长过快,会发生什么情况?

❏ 如果Subscriber还未准备好接收数据,你会丢弃事件么?

“基于拉模式的反应式背压”这一概念。之所以称其为“基于拉模式的反应式”,是因为它为Subscriber提供了一种途径,借助于事件(反应式)去“拉取”(通过request方法)Publisher提供的更多信息。其结果就是背压机制。

反应式系统和反应式编程

反应式系统是一个程序,其架构很灵活,可以在运行时调整以适应变化的需求。三大特性可以概括为响应性、韧性和弹性。

响应性意味着反应式系统不能因为正在替某人处理一个大型任务就延迟其他用户的查询请求,它必须实时地对输入进行响应。韧性意味着系统不能因为某个组件失效就无法提供服务。某个网络连接出现问题,不应该影响其他网络的查询服务,对无法响应组件的查询应该被重新路由到备用组件上。弹性意味着系统可以调整以适应工作负荷的变化,持续高效地运行。

就像可以在酒吧中动态调整提供食物和提供酒水服务的员工,让两个队列的等待时间都保持一致,同样,也可以调整各种软件服务的工作线程数,避免工作线程处于闲等状态,以使每个队列都能高效地处理。

主流的方式是使用由Java的java.util. concurrent.Flow接口提供的反应式编程。这些接口的设计反映了反应式宣言的第4个,也是最后一个属性,即消息驱动。消息驱动的系统基于线框-管道模型提供了内部API,组件等待要处理的输入,处理结果通过消息发送给其他的组件,以这种方式创建了一个反应式系统。

小结

❏ Java对并发的支持由来已久,并且还在持续演进。通常而言,线程池技术很有帮助,然而如果你有大量可能阻塞的任务,使用它反而会带来麻烦。

❏ 方法异步化(在完成它们的工作之前返回)能提升程序的并发度,其可以与用于循环结构的优化进行互补。

❏ 使用线框-管道模型可以对异步系统进行可视化。

❏ Java 8的CompletableFuture类和Java 9的Flow API都可以通过线框-管道图表示。

❏ CompletableFuture类常用于一次性的异步计算。使用结合器可以组合多个异步计算,并且无须担心使用Future时的阻塞风险。

❏ Flow API基于“发布-订阅”协议,它与背压一起构成了Java反应式编程的基础。❏ 反应式编程可以帮助实现反应式系统。

CompletableFuture:组合式异步编程

Future接口

Java 5引入了Future接口,它的设计初衷是对将来某个时刻会发生的结果进行建模。举个例子,调用方发起远程服务查询时,它是无法立刻得到查询结果的。采用Future接口可以对异步计算进行建模,返回一个指向执行结果的引用,运算结束后,调用方可以通过该引用访问执行的结果。在Future中触发那些可能耗时的调用,能够将调用线程解放出来,让它们继续执行其他有价值的工作,不必呆呆等待耗时的操作完成。

Future的另一大优点是它比更底层的Thread更好用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就好了。

这种编程方式让线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。

接着,如果已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞线程,直到操作完成,返回相应的结果。

使用Future以异步方式执行长时间的操作

虽然Future提供了一个无须任何参数的get方法,推荐使用重载版本的get方法,它接受一个超时的参数,通过它,可以定义线程等待Future结果的最长时间,而不是永无止境地等待下去

Future接口的局限性

Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以编写简洁的并发代码。

难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。

描述能力特性

❏ 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。

❏ 等待Future集合中的所有任务都完成。

❏ 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。

❏ 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。

❏ 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

CompletableFuture类(它实现了Future接口)如何利用Java 8的新特性以更直观的方式将上述需求都变为可能。Stream和CompletableFuture的设计都遵循了类似的模式:都使用了Lambda表达式以及流水线的思想。从这个角度,可以说CompletableFuture和Future的关系就跟Stream和Collection的关系一样。

使用CompletableFuture构建异步应用

”最佳价格查询器”(best-price-finder)的应用,查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。

❏ 首先,你会学到如何为你的客户提供异步API(如果你拥有一间在线商店的话,这是非常有帮助的)。

❏ 其次,掌握如何使用了同步API的代码变为非阻塞代码。会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。

❏ 会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户就可能遭遇白屏)。

:::danger
同步API与异步API
同步API 其实只是对传统方法调用的另一种称呼:

调用了某个方法,调用方在被调用方执行的过程中会等待,被调用方执行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用名字的由来。

与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交由另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。

执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种风格的计算在I/O系统程序设计中很常见:发起了一次磁盘访问,如果同时还有很多其他计算任务,那这次访问与其他计算任务会异步执行,完成其他任务没有别的事情做时,会等待磁盘块载入内存。

注意,阻塞和非阻塞通常用于描述操作系统的某种I/O实现。

然而,这些术语也常常等价地用在非I/O的上下文中,即“异步调用”和“同步调用”。

:::

实现异步API

public class Shop {public double getPrice(String product) {// 待实现}
}
/**该方法的内部实现会查询商店的数据库,
但也有可能执行一些别的耗时的任务,
比如联系其他外部服务
(比如,商店的供应商,或者跟制造商相关的推广折扣)
**/
//模拟1秒钟延迟的方法
public static void delay() {try {Thread.sleep(1000L);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
/**getPrice方法会调用delay方法,并返回一个随机计算的值
取巧
使用charAt,依据产品的名称,生成一个随机值作为价格
**///在getPrice方法中引入一个模拟的延迟
public double getPrice(String product) {
return calculatePrice(product);
}
private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。

将同步方法转换为异步方法

将getPrice转换为getPriceAsync方法,并修改它的返回值

public Future<Double> getPriceAsync(String product) { ... }

Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。

意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。

因为这样的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。

新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法

getPriceAsync方法的实现

:::danger
创建了一个代表异步计算的CompletableFuture对象实例,它在计算完成时会包含计算的结果。

接着,调用fork创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个Future实例。

当请求的产品价格最终计算得出时,可以使用它的complete方法,结束CompletableFuture对象的运行

:::

显然,这个新版Future的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用

使用异步API

客户向商店查询了某种商品的价格。由于商店提供了异步API,该次调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格。

这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,而不会呆呆地阻塞在那里等待第一家商店返回请求的结果。

最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,就再调用Future的get方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问

Invocation returned after 43 msecs
Price is 123.26
Price returned after 1045 msecs

getPriceAsync方法的调用返回远远早于最终价格计算完成的时间

避免发生客户端被阻塞的风险:Future执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数

错误处理

如果价格计算过程中产生了错误会怎样呢?

用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

解决:

  1. 客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。
  2. 代码中添加超时判断的逻辑,避免发生类似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException

让客户端能了解商店无法提供请求商品价格的原因,需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出

抛出CompletableFuture内的异常

客户端现在会收到一个ExecutionException异常,该异常接受了一个包含失败原因的Exception参数,即价格计算方法最初抛出的异常。

如果该方法抛出了一个运行时异常“product isn't available”,客户端就会得到像下面这样一段ExecutionException:

Exception in thread "main" java.lang.RuntimeException:
java.util.concurrent.ExecutionException: java.lang.RuntimeException:
product not available
at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:16)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException:
product not available
at java.base/java.util.concurrent.CompletableFuture.reportGet
(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get
(CompletableFuture.java:1999)
at java89inaction.chap16.AsyncShopClient.main(AsyncShopClient.java:14)
Caused by: java.lang.RuntimeException: product not available
at java89inaction.chap16.AsyncShop.calculatePrice(AsyncShop.java:38)
at java89inaction.chap16.AsyncShop.lambda$0(AsyncShop.java:33)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run
(CompletableFuture.java:1700)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec
(CompletableFuture.java:1692)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283)
at java.base/java.util.concurrent.ForkJoinPool.runWorker
(ForkJoinPool.java:1603)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run
(ForkJoinWorkerThread.java:175)

使用工厂方法supplyAsync创建CompletableFuture

CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节

//使用工厂方法supplyAsync创建CompletableFuture对象
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。

生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。

向CompletableFuture的工厂方法传递可选参数,指定生产者方法的执行线程是可行的

:::info
getPriceAsync方法返回的CompletableFuture对象

手工创建和完成的CompletableFuture对象是完全等价的

这意味着它提供了同样的错误管理机制,而前者花费了大量的精力才得以构建。

:::

让代码免受阻塞之苦

List<Shop> shops = List.of(new Shop("BestPrice"),new Shop("LetsSaveBig"),new Shop("MyFavoriteShop"),new Shop("BuyItAll"));

使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称和该商店中指定商品的价格

public List<String> findPrices(String product);

顺序查询所有商店的方式实现的findPrices方法

public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
.collect(toList());
}

验证findPrices的正确性和执行性能

long start = System.nanoTime();
System.out.println(findPrices("myPhone27S"));
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Done in " + duration + " msecs");
//输出
/**
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
is 214.13, BuyItAll price is 184.74]
Done in 4032 msecs
**/

findPrices方法的执行时间仅比四秒钟多了那么几毫秒,因为对这四个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒左右的时间计算请求商品的价格

使用并行流对请求进行并行操作

新版findPrices的改进

[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price
is 214.13, BuyItAll price is 184.74]
Done in 1180 msecs

现在对4个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1秒多一点儿。尝试使用刚学过的Completable-Future,将findPrices方法中对不同商店的同步调用替换为异步调用。

使用CompletableFuture发起异步请求

使用工厂方法supplyAsync创建CompletableFuture对象

List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product))))
.collect(toList());

得到一个List<CompletableFuture>,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称。

由于用CompletableFuture实现的findPrices方法要求返回一个List,因此你需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。

向最初的List<CompletableFuture>施加第二个map操作,对List中的所有future对象执行join操作,一个接一个地等待它们运行结束。

注意CompletableFuture类中的join方法和Future接口中的get方法有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。使用CompletableFuture实现findPrices方法

这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。

考虑流操作之间的延迟特性,如果在单一流水线中处理流,那么发向不同商家的请求只能以同步、顺序执行的方式才会成功。

因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作,通知join方法返回计算结果流程图。为什么Stream的延迟特性会引起顺序执行,以及如何避免

上半部分展示了使用单一流水线处理流的过程,执行的流程(以虚线标识)是顺序的。

事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。

与此相反,下半部分展示了如何先将CompletableFuture对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。

:::info
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]

Done in 2005 msecs

:::

超过两秒意味着利用CompletableFuture实现的版本,原生顺序执行且会发生阻塞的版本快。但是它的用时也差不多是使用并行流的前一个版本的两倍。尤其是,考虑到从顺序执行的版本转换到并行流的版本只做了非常小的改动,不尽人意

寻找更好的方案

并行流的版本工作得非常好,那是因为它能并行地执行四个任务,所以它几乎能为每个商家分配一个线程。但是,如果想要增加第5个商家到商店列表中,让“最佳价格查询”应用对其进行处理,那这时会发生什么情况?毫不意外,顺序执行版本的执行还是需要大约5秒多钟的时间

并行流版本的程序这次比之前也多消耗了差不多1秒钟的时间,因为可以并行运行(通用线程池中处于可用状态)的四个线程现在都处于繁忙状态,都在对前四个商店进行查询。第5个查询只能等到前面某一个操作完成以释放出空闲线程才能继续

CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。

试图让代码处理九个商店,那么并行流版本耗时3143毫秒,而CompletableFuture版本耗时3009毫秒。

究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。

使用定制的执行器

明智的选择似乎是创建一个配有线程池的执行器,线程池中线程的数目取决于你预计你的应用需要处理的负荷

《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大小与处理器的利用率之比可以使用下面的公式进行估算

$ N_{thread}=N_{CPU} * U_{CPU} * (1+W/C) $

❏ NCPU是处理器的核的数目,可以通过Runtime.getRuntime().available Processors()得到;

❏ UCPU是期望的CPU利用率(该值应该介于0和1之间);

❏ W/C是等待时间与计算时间的比率。

应用99%的时间都在等待商店的响应,所以估算出的W/C比率为100。意味着如果你期望的CPU利用率是100%,那么你需要创建一个拥有400个线程的线程池。

实际操作中,如果创建的线程数比商店的数目更多,反而是一种浪费,因为这样做之后,线程池中的有些线程根本没有机会被使用。

出于这种考虑,建议将执行器使用的线程数,与需要查询的商店数目设定为同一个值,这样每个商店都应该对应一个服务线程。

不过,为了避免发生由于商店的数目过多导致服务器超负荷而崩溃,还是需要设置一个上限,比如100个线程。“最优价格查询器”应用定制的执行器

现在正创建的是一个由守护线程构成的线程池。当一个普通线程在执行时,Java程序无法终止或者退出,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。

与此相反,如果将线程标记为守护进程,则意味着程序退出时它也会被回收。这二者之间没有性能上的差异。

可以将执行器作为第二个参数传递给supplyAsync工厂方法

//创建一个可查询指定商品价格的CompletableFuture对象
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " +shop.getPrice(product), executor);

使用CompletableFuture方案的程序处理五个商店仅耗时1021毫秒,处理九个商店时耗时1022毫秒。

一般而言,这种状态会一直持续,直到商店的数目达到我们之前计算的阈值400。

这个例子证明了要创建更适合你的应用特性的执行器,利用CompletableFuture向其提交任务执行是个不错的主意。

处理需大量使用异步操作的情况时,这几乎是最有效的策略。

并行——使用流还是CompletableFuture?

对集合进行并行计算有两种方式:

  1. 要么将其转化为并行流,利用map这样的操作开展工作,
  2. 要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。

后者提供了更多的灵活性,可以调整线程池的大小,而这能帮助确保整体的计算不会因为线程都在等待I/O而发生阻塞。

建议

❏ 如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。

❏ 反之,如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

对多个异步任务进行流水线操作

假设所有的商店都同意使用一个集中式的折扣服务。该折扣服务提供了五个不同的折扣代码,每个折扣代码对应不同的折扣率。使用一个枚举型变量Discount.Code来实现这一想法

//以枚举类型定义的折扣代码
public class Discount {public enum Code {NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);private final int percentage;Code(int percentage) {this.percentage = percentage;}}// Discount类的具体实现这里暂且不表示,参见代码清单16-14
}

假设所有的商店都同意修改getPrice方法的返回格式。getPrice现在以ShopName:price:DiscountCode的格式返回一个String类型的值。我们的示例实现中会返回一个随机生成的Discount.Code,以及已经计算得出的随机价格

public String getPrice(String product) {
double price = calculatePrice(product);
Discount.Code code = Discount.Code.values()[
random.nextInt(Discount.Code.values().length)];
return String.format("%s:%.2f:%s", name, price, code);
}
private double calculatePrice(String product) {delay();return random.nextDouble() * product.charAt(0) + product.charAt(1);
}

调用getPrice方法可能会返回像下面这样一个String值:调用getPrice方法可能会返回像下面这样一个String值

实现折扣服务

“最佳价格查询器”应用现在能从不同的商店取得商品价格,解析结果字符串,针对每个字符串,查询折扣服务器的折扣代码。

流程决定了请求商品的最终折扣价格(每个折扣代码的实际折扣比率有可能发生变化,所以每次都需要查询折扣服务)。

//对商店返回字符串的解析操作封装到了下面的Quote类之中
public class Quote {private final String shopName;private final double price;private final Discount.Code discountCode;public Quote(String shopName, double price, Discount.Code code) {this.shopName = shopName;this.price = price;this.discountCode = code;}public static Quote parse(String s) {String[] split = s.split(":");String shopName = split[0];double price = Double.parseDouble(split[1]);Discount.Code discountCode = Discount.Code.valueOf(split[2]);return new Quote(shopName, price, discountCode);}public String getShopName() { return shopName; }public double getPrice() { return price; }public Discount.Code getDiscountCode() { return discountCode; }
}

传递shop对象返回的字符串给静态工厂方法parse,你可以得到Quote类的一个实例,它包含了shop的名称、折扣之前的价格,以及折扣代码。

Discount服务还提供了一个applyDiscount方法,它接受一个Quote对象,返回一个字符串,表示生成该Quote的shop中的折扣价格

使用Discount服务

由于Discount服务是一种远程服务,因此你还需要增加1秒钟的模拟延迟,首先尝试以最直接的方式(坏消息是,这种方式是顺序而且同步执行的)重新实现findPrices,以满足这些新增的需求。

在shop构成的流上采用流水线方式执行三次map操作,我们得到了期望的结果。

❏ 第一个操作将每个shop对象转换成了一个字符串,该字符串包含了该shop中指定商品的价格和折扣代码。

❏ 第二个操作对这些字符串进行了解析,在Quote对象中对它们进行转换。

❏ 最终,第三个map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回包含该价格及提供该价格商品的shop的字符串。

这种实现方式的性能远非最优

:::info
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]

Done in 10028 msecs

:::

耗时10s。顺序查询五个商店耗时大约5秒,现在又加上了Discount服务为五个商店返回的价格申请折扣所消耗的5秒钟

  • 已知流转换为并行流的方式,非常容易提升该程序的性能
  • 也知这一方案在商店的数目增加时,扩展性不好,因为Stream底层依赖的是线程数量固定的通用线程池
  • 也知道,通过自定义CompletableFuture调度任务执行的执行器能够更充分地利用CPU资源。

构造同步和异步操作

使用CompletableFuture提供的特性,以异步方式重新实现findPrices方法

使用CompletableFuture实现findPrices方法

三次转换:构造同步操作和异步任务

只需要将Lambda表达式作为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。

获取

第一个转换的结果是一个Stream<CompletableFuture>,一旦运行结束,每个CompletableFuture对象中都会包含对应shop返回的字符串。

对CompletableFuture进行了设置对CompletableFuture进行了设置

解析价格

第二次转换将字符串转变为订单。因为一般情况下解析操作不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。

由于这个原因,对第一步中生成的CompletableFuture对象调用它的thenApply,将一个由字符串转换Quote的方法作为参数传递给它。

直到你调用的CompletableFuture执行结束,使用的thenApply方法都不会阻塞你代码的执行。

这意味着CompletableFuture最终结束运行时,你希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture对象转换为对应的CompletableFuture对象。

你可以把这看成是为处理CompletableFuture的结果建立了一个菜单,就像曾经为Stream的流水线所做的事儿一样。

为计算折扣价格构造Future

第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。

这一转换与前一个转换又不大一样,因为这一转换需要远程执行(或者,就这个例子而言,需要模拟远程调用带来的延迟),出于这一原因,也希望它能够异步执行。

实现这一目标,像第一个调用传递getPrice给supplyAsync那样,将这一操作以Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个CompletableFuture对象。

到目前为止,已经进行了两次异步操作,用了两个不同的CompletableFuture对象进行建模,希望能把它们以级联的方式串接起来进行工作。

  • ❏ 从shop对象中获取价格,接着把价格转换为Quote。
  • ❏ 拿到返回的Quote对象,将其作为参数传递给Discount服务,取得最终的折扣价格。

Java 8的CompletableFutureAPI提供了名为thenCompose的方法

它就是专门为这一目的而设计的,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。

换句话说,你可以创建两个CompletableFuture对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。

当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。

使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。

将这三次map操作返回的Stream元素收集到一个列表,你就得到了一个List<CompletableFuture>,等这些CompletableFuture对象最终执行完毕,就可以利用join取得它们的返回值。

thenCompose方法像CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。

通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行,而名称以Async结尾的方法会将后续任务提交到一个线程池,所以每个任务是由不同的线程处理的。

就这个例子而言,第二个CompletableFuture对象的结果取决于第一个CompletableFuture,

所以无论使用哪个版本的方法来处理CompletableFuture对象,对于最终的结果,或者大致的时间而言都没有多少差别。

我们选择thenCompose方法的原因是因为它更高效一点,因为它少了很多线程切换的开销。

注意,即便如此,也很难搞清楚到底使用的是哪一个线程,尤其是应用还使用了自己的线程池(譬如Spring),那就更加困难了。

将两个CompletableFuture对象整合起来,无论它们是否存在依赖

对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。

但是,另一种比较常见的情况是,需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且也不希望等到第一个任务完全结束才开始第二个任务。

应该使用thenCombine方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供了一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,那么由另一个任务以异步的方式执行。

示例

一家商店提供的价格是以欧元(EUR)计价的,但是你希望以美元的方式提供给你的客户。

可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。

当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。

用这种方式,需要使用第三个CompletableFuture对象,当前两个CompletableFuture计算出结果,并由BiFunction方法完成合并后,由它来最终结束这一任务合并两个独立的CompletableFuture对象

整合的操作只是简单的乘法操作,用另一个单独的任务对其进行操作有些浪费资源,所以只要使用thenCombine方法,无须特别求助于异步版本的thenCombineAsync方法。

创建的多个任务是如何在线程池中选择不同的线程执行的,以及最终的运行结果又是如何整合的。合并两个相互独立的异步任务

对Future和CompletableFuture的回顾

CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。利用Java 7的方法合并两个Future对象

通过向执行器提交一个Callable对象的方式创建了第一个Future对象,向外部服务查询欧元和美元之间的转换汇率。紧接着,创建了第二个Future对象,查询指定商店中特定商品的欧元价格。

在同一个Future中通过查询商店得到的欧元商品价格乘以汇率得到了最终的价格

使用thenCombineAsync,不使用thenCombine,采用第三个Future单独进行商品价格和汇率的乘法运算,效果是几乎相同的。这两种实现看起来没太大区别,原因是你只对两个Future进行了合并。

高效地使用超时机制

读取采用Future计算结果值时,为了避免线程等待结果返回导致的永久阻塞,设定一个超时机制是个不错的主意。Java 9通过CompletableFuture提供了多个方法,可以更加灵活地设置线程的超时机制。

orTimeout在指定的超时到达时,会通过Scheduled-ThreadExecutor线程结束该CompletableFuture对象,并抛出一个TimeoutException异常,它的返回值是一个新的CompletableFuture对象。

凭借这一方法,可以将计算流水线串接起来,发生TimeoutException异常时,反馈一个友好的消息给用户。

为CompletableFuture添加超时

有时,如果服务偶然性地无法及时响应,临时使用默认值继续执行也是一种可接受的解决方案。

期望汇率服务1秒钟之内就能返回欧元到美元的兑换汇率。不过,即便请求耗时更长,也不希望程序直接抛出一个异常,让之前的计算开销付之东流。

这种情况下,希望程序可以退化为使用预先定义的汇率。通过Java 9新引入的completeOnTimeout方法,可以轻松地完成这一任务,为程序添加第二种超时机制,超时之后,采用默认值继续执行CompletableFuture

同orTimeout方法一样,completeOnTimeOut方法也返回一个CompletableFuture,可以将它与其他的CompletableFuture方法链接起来。

简短地回顾一下,目前我们已经能配置两种类型的超时:一种是如果程序执行超时,譬如超过3秒,整个计算都会失败;另一种是如果程序执行超时,譬如超过1秒,还可以使用预定义的默认值继续执行,不会发生失效。

CompletableFuture自身执行完毕之前,调用它的get或者join方法,执行都会被阻塞。、

响应CompletableFuture的completion事件

//一个模拟生成0.5秒至2.5秒随机延迟的方法
private static final Random random = new Random();
public static void randomDelay() {int delay = 500 + random.nextInt(2000);try {Thread.sleep(delay);} catch (InterruptedException e) {throw new RuntimeException(e);}
}

只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。

避免问题

等待创建一个包含了所有价格的List创建完成。你应该做的是直接处理CompletableFuture流,这样每个CompletableFuture都在为某个商店执行必要的操作

//重构findPrices方法返回一个由Future构成的流
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

为findPricesStream方法返回的Stream添加了第4个map操作,在此之前,已经在该方法内部调用了三次map。

这个新添加的操作其实很简单,只是在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的返回值。

Java 8的CompletableFutureAPI通过thenAccept方法提供了这一功能,它接受CompletableFuture执行完毕后的返回值做参数。

在这里的例子中,该值是由Discount服务返回的字符串值,它包含了提供请求商品的商店名称及折扣价格

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

和之前看到的thenCompose和thenCombine方法一样,thenAccept方法也提供了一个异步版本,名为thenAcceptAsync。

thenAccept方法已经定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture

因此,map操作返回的是一个Stream<CompletableFuture>。对这个CompletableFuture对象,能做的事非常有限,只能等待其运行结束

还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,把构成Stream的所有CompletableFuture对象放到一个数组中,等待所有的任务执行完成

//响应CompletableFuture的completion事件
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

allOf工厂方法接受一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture对象。

意味着,如果你需要等待最初Stream中的所有CompletableFuture对象执行完毕,那么对allOf方法返回的CompletableFuture执行join操作是个不错的主意。

另一些场景中,你可能希望只要CompletableFuture对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,使用一个类似的工厂方法anyOf。该方法接受一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture

实践

long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream("myPhone27S")
.map(f -> f.thenAccept(s -> System.out.println(s + " (done in " +((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in "+ ((System.nanoTime() - start) / 1_000_000) + " msecs");
"""
BuyItAll price is 184.74 (done in 2005 msecs)
MyFavoriteShop price is 192.72 (done in 2157 msecs)
LetsSaveBig price is 135.58 (done in 3301 msecs)
ShopEasy price is 167.28 (done in 3869 msecs)
BestPrice price is 110.93 (done in 4188 msecs)
All shops have now responded in 4188 msecs
"""

小结

❏ 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。

❏ 尽可能地为客户提供异步API。使用CompletableFuture类提供的特性,能够轻松地实现这一目标。

❏ CompletableFuture类还提供了异常管理的机制,让你有机会抛出/管理异步任务执行中发生的异常。❏ 将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。

❏ 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,那么可以将这些异步任务构造或者合并成一个。

❏ 可以为CompletableFuture注册一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序。

❏ 可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。

❏ Java 9通过orTimeout和completeOnTimeout方法为CompletableFuture增加了对异步超时机制的支持

反应式编程(Reactive响应式/反应式编程)

采用反应式流(reactive stream)、 一种广泛采用的反应式库——RxJava、使用弹珠图可视化地记录反应式流上的操作

用途

❏ 大数据——以PB计量的大数据,并且数量还在不断增加。

❏ 异构环境——应用被部署到完全异构的环境中,它可能是移动设备,也可能是运行着数千个多核处理器的云端集群。

❏ 使用模式——用户的期望发生了变化,现在用户期望毫秒级的响应时间,希望应用百分之百时时刻刻都在线。

反应式编程让你能以异步的方式处理、整合来自不同系统和源头的数据流。可以在处理数据的同时进行反馈,让数据对用户的响应更及时。反应式编程不仅可以构建单一组件或者应用,还能用于协调多个组件,将它们搭建成一个反应式系统。同样的方式,系统工程师能依据网络的变化调整消息路由,从而保证系统在高负荷或者发生节点失效时依旧能稳定地提供服务。

以这种新型混聚、松散耦合方式构建的系统中,组件很多时候就是应用

反应式应用和规范(反应式宣言)

反应式应用的四个典型特征。

❏ 响应性——顾名思义,反应式系统的响应时间很快,更重要的是它的响应时间应该是稳定且可预测的。只有这样,用户才能明确地设定他的预期。而这反过来又会增强用户的信心,是应用易用性的关键指标。

❏ 韧性——系统在出现失败时依然能继续响应服务。为了构建弹性的应用,反应式宣言提供了一系列的建议,包括组件运行时复制,从时间(发送方和接受方都拥有相互独立的生命周期)和空间(发送方和接收方运行于不同的进程)维度对组件进行解耦,从而使任何一个组件都能以异步的方式向其他组件分发任务。

❏ 弹性——影响应用响应性的另一个重要因素是应用的工作负载。应用生命周期中不可避免地会遭遇各种规模的负载。反应式系统在设计时就需要考虑这一点,增加分配的资源后,受影响的组件要有能力自动地适配和服务更大的负荷。

❏ 消息驱动——韧性和弹性要求明确定义构成系统的组件之间的边界,从而确保组件间的松耦合、组件隔离以及位置透明性。跨组件通信则通过异步消息传递。这种设计既实现了韧性(以消息传递组件失败)又确保了弹性(通过监控交换消息规模的变化,适时调整资源分配,从而实现资源配置的优化,满足业务的需求)

四个特征之间的相互依赖关系。

搭建小型应用内部架构还是选择用什么策略协调各个应用来构建一个大型系统。关于应用这些思想的细节,尤其是如何界定组件的粒度,还需要进一步的讨论。

反应式系统的关键特征

应用层的反应式编程

对应用层组件而言,反应式编程的主要特征使得任务能以异步的方式运行。以异步非阻塞方式处理事件流对充分利用现代多核处理器至关重要,或者更确切地说,这一技术让线程尽可能地竞争处理器的使用权。为了达到这一目的,反应式编程框架和库会在轻量级的结构,譬如Future、Actor或者更常见的事件循环间共享线程(相对昂贵且稀缺的资源),以分发回调函数的结果,最终实现对事件处理结果的收集、转换和管理。

采用这种线程多路复用策略时需要特别注意一点:不要在主事件循环中添加可能阻塞的操作。提到阻塞操作,这里特别要关注的是所有I/O密集型的操作,譬如访问数据库或文件系统,或者调用远程服务,这些都是可能消耗比较长时间的事件,甚至无法预测何时能够结束。

线程多路复用时避免阻塞操作

场景:设想有这样一个典型多路复用的简单场景,这个场景中你需要创建一个两线程的线程池,处理来自三个流的事件。由于同一时刻只能处理两个流,只有通过竞争,流才能高效公平地共享那两个线程。现在假设其中一个流中,某个事件触发了一个可能很慢的I/O操作,譬如向文件系统写入数据,或者调用阻塞式API从数据库中拉取数据。

在这种情况下,线程2由于需要等待I/O操作完成,傻傻地阻塞在那里,无法继续执行有意义的工作。此时线程1还在处理第一个流的数据,阻塞操作完成之前,第三个流完全没有机会被处理。阻塞操作让线程进入闲等状态,其他的计算也无法获得执行机会

解决这一问题,大多数的反应式框架(譬如RxJava和Akka)中都可以开辟独立的线程池用于执行阻塞式操作。主线程池中运行的线程执行的都为无阻塞的操作,以确保所有的CPU核都能得到最充分的利用。为CPU密集型和I/O密集型的操作分别创建单独的线程池还有更深层的好处,可以更精细地监控不同类型任务的性能,从而更好地配置和调整线程池的规模,更好地适应业务的需求。

:::info
过遵循反应式原则开发应用只是反应式编程的一小部分,很多时候甚至不是最困难的部分。将一系列反应式应用整合成一个协调良好的交互式系统与设计一个独立高效运行的反应式应用比较起来,其重要程度不相上下。

:::

反应式系统

反应式系统是一种新型软件架构,应用这种架构多个独立应用可以像一个单一系统那样步调一致地工作,同时其又具备良好的扩展性,构成反应式系统的各个应用也是充分解耦的,因此,即使其中某一个应用发生失效,也不会拖垮整个系统。

反应式应用与反应式系统的主要区别是,前者主要对临时数据流进行处理,因此其工作模式被称为事件驱动型。而后者主要用于构造应用以及协调组件间的通信。具备这种特征的系统通常会被称为消息驱动系统。

消息驱动与事件驱动的另一个重要区别是,消息往往是直接发送给某个单一目标的,而事件会被所有注册了该事件的组件接收。

此外,还有一点非常重要,值得特别提一下,反应式系统中消息是以异步的方式发送和接收的,这种方式有效地解耦了发送方与接收方。

组件间完全的解耦合既是实现有效隔离的必要前提,也是保障系统在遭遇失效(韧性)和超大负荷(弹性)时仍能保持响应的基础。

更确切地说,反应式架构的韧性是凭借将失效隔离在组件内部,避免故障传递到临接的组件来实现的,如果不加控制的话,这种灾难传递可能会毁掉整个系统。

从反应式系统角度而言,韧性更偏向于容错。

系统不只要能优雅地降级,更重要的是能通过隔离失效组件,将系统重新拉回健康状态。

这种神奇的魔力来自于将失效控制在一个范围内,并将这些失效作为消息传递给管理组件。通过这种方式,失效节点的管理可以不受失效组件自身的影响,在一个安全的上下文中进行。

位置透明性之于韧性与隔离和解耦之于弹性一样至关重要,是反应式系统实现韧性的决定性要素。

基于位置透明性,反应式系统的所有组件都可以和其他任何服务通信,无须顾忌接收方在什么位置。位置透明性使得系统能够依据当前的负荷情况,对应用进行复制或者自动地水平扩展。

这种位置无关的扩展也是反应式应用(异步、并发、即时松耦合)与反应式系统(凭借位置透明性从空间角度解耦)之间的另一个区别。

反应式流以及Flow API

反应式编程是一种利用反应式流的编程技术。

而反应式流是以异步方式处理潜在无边界数据流的标准技术(它基于“发布-订阅”模型,也叫pub-sub),其处理时按先后次序进行,并且带有不可阻塞的背压

背压是发表-订阅模式下的一种常见的流量控制机制,目的是避免流中事件处理的消费者由于处理速度较慢,被一个或多个快速的生产者压垮。

出现这种情况时,如果受压组件发生灾难式的崩溃,或者以无法控制的方式丢弃事件都是不可接受的。组件需要一种方式来向上游生产者反馈,让它们减缓生产速度,或者告诉生产者它在接收更多数据之前,在给定的时间内能够接受和处理多少事件。

背压的这些内置要求是由流处理天然的异步特质决定的。

实际上,执行同步调用时,系统默认就会收到来自阻塞API的背压。

遭遇这种不幸的场景时,你将无法执行任何任务,直到阻塞操作完成,因此,由于等待你会浪费大量的资源。

与之相反,使用异步式API,硬件资源的使用率能够大幅提高,甚至达到其极限,不过可能由此压垮下游处理速度较慢的组件。

引入背压或者流量控制机制的目的就是解决这一问题,它们提供了一种协议,可以在不阻塞线程的情况下,避免数据接收方被压垮。


反应式的这些需求和行为都汇集浓缩到了反应式流(Reactive Streams)项目中。这个项目的成员来自于奈飞(Netflix)、红帽(Red Hat)、Twitter、Lightbend等公司。依据这些需求和行为,反应式流项目定义了实现任何反应式流都必须提供的四个相互关联的接口。这些接口现在是Java 9语言的组成部分,由新的java.util.concurrent.Flow类提供。很多第三方库,包括Akka流(Lightbend公司)、Reactor(Pivotal公司),以及Vert.x(红帽公司)都提供了这些接口的实现

Flow类

Java 9为了支持反应式编程新增了一个类:java.util.concurrent.Flow。这个类只包含一个静态组件,无法实例化。Flow类包含了四个嵌套的接口来体现反应式项目定义的标准“发布-订阅”模型,分别是:

  • ❏ 发布者(Publisher);
  • ❏ 订阅者(Subscriber);
  • ❏ 订阅(Subscription);
  • ❏ 处理者(Processor)。

凭借Flow类,相互关联的接口或者静态方法可以构造流控(flow-controlled)组件。

Publisher生产的元素可以被一个或多个Subscriber消费,Publisher与Subscriber之间的关系通过Subscription管理。Publisher是顺序事件的提供者,并且这些事件的数量可能没有上限,不过它也受背压机制的制约,按照Subscriber的反馈进行元素的生产。

Publisher是一个Java函数式接口(它仅仅声明了一个抽象方法), Subscriber可以把自己注册为该事件的监听方从而完成对Publisher事件的注册。

流量控制,包括Publisher与Subscriber之间的背压都是由Subscription管理的。

Flow.Publisher接口

@FunctionalInterface
public interface Publisher<T> {void subscribe(Subscriber<? super T> s);
}
//Subscriber接口提供了四个回调函数,这些回调函数会在Publisher生产对应事件时被调用。
public interface Subscriber<T> {void onSubscribe(Subscription s);void onNext(T t);void onError(Throwable t);void onComplete();
}
//这些事件的发布(以及对应方法的调用)都必须严格遵守下面协议定义的顺序:
// onSubscribe onNext* (onError | onComplete)? 
/**
这种表示法的含义是onSubscribe方法始终作为第一个事件被调用,接下来是任意多个onNext方法的调用。
事件流的处理可能持续不断,也可能借由onComplete回调方法终止,表面接下来没有更多需要处理的元素了,
抑或如果Publisher发生了失效,就会执行onError调用(可以对比从终端正常读取一个字符串,
或者读取到文件末尾,或者发生I/O错误的情况)​。Subscriber向Publisher注册时,
Publisher的第一个动作就是调用onSubscribe方法并回传一个Subscription对象。
Subscription接口定义了两个方法。
Subscriber可以使用第一个方法通知Publisher它已经准备好接收多少个事件,
第二个方法用于取消Subscription,
因此它的作用就是告诉Publisher它已经不再希望接收来自Publisher的事件。
**/
public interface Subscription {void request(long n);void cancel();
}

Java 9的Flow规范定义了一系列的规则,通过这些规则,协议的接口之间能相互沟通协调。

下面总结了这些规则的内容。

  • ❏ Publisher发送给Subscriber的元素数量不能超过其在Subscription的request方法中指定的数目。不过如果Subscription被onComplete方法成功地终止,或者Subscription执行过程中发生了错误,调用了onError方法,Publisher也可能还没达到设定的数量就停止调用onNext向Subscriber发送元素了。发生这种情况,Subscription就变成了终止状态(即onComplete或者onError), Publisher无法再向Subscriber发送任何信号,对应的Subscription只能被看作取消了。

  • ❏ Subscriber必须告知Publisher它是否已经准备好接收数据以及能够处理多少元素。凭借这种方式,Subscriber向Publisher执行了“背压”操作,有效地避免了Subscriber被超载数据压垮的情况发生。此外,执行onComplete或者onError操作时,Subscriber不能再次调用Publisher或者Subscription中的方法,这个时刻的Subscription已经被取消了。最后,发出Subscription.cancel()调用后,即使还未执行Subscription.request()方法,也没有通过onNext接收到任何消息,Subscriber也要准备好进行终止操作。

  • ❏ Subscription只能被一对Publisher和Subscriber共享,这代表了它们之间独一无二的关系。基于这个原因,Subscriber可以从onSubscribe和onNext方法中以异步方式调用它的request方法。标准还规定了Subscription.cancel()方法的实现必须是幂等(即调用它一次与重复调用多次的效果是同样的)和线程安全的,这样才能保证执行完第一次调用后,任何对Subscription的额外调用都不会有副作用。执行Subscription.cancel()调用后,Publisher会彻底删除对应Subscriber的引用。规则不推荐大家重复订阅同一个Subscriber,但是它并没有强制发生这种情况时抛出异常,因为所有之前取消的Subscription都需要妥善地保存下来。

    Flow类的第4个也是最后一个成员是Processor接口。它同时继承了Publisher和Subscriber,但没有额外添加新的方法。

  • 一个典型应用的生命周期,它实现了Flow API中定义的接口。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

这个接口反映的就是反应式流中事件的转化阶段。

接收到错误时,Processor可以选择从出错状态恢复(接着需要将该Subscription设置为取消状态),或者直接向Subscriber抛出onError信号。

当最后一个Subscriber取消其Subscription时,Processor也应该取消其上游的Subscription以传递该取消信号(尽管规范中并未严格规定此时一定要执行这样的取消操作)。

创建你的第一个反应式应用

大多数情况下,不建议直接去实现Flow类中定义的接口。非比寻常地,Java 9库也并未提供实现它们的类。这些接口的实现借由前面提到的反应式库(譬如Akka、RxJava等)完成。Java 9的java.util.concurrency.Flow规范既是所有实现该接口的库需要遵守的合约,也是使构建于不同的反应式库之上的应用间能相互协调、相互理解沟通的通用语言。反应式库一般都提供了更丰富的特性(除了由java.util.concurrency.Flow接口定义的最小功能集外,它们往往提供了更多对反应式流进行转换和归并的类和方法)。

直接基于Java 9的Flow API创建你的第一个反应式应用对于理解这四个接口之间是如何工作的非常有价值。

  • ❏ TempInfo,它模拟一个远程温度计(持续不断地回报温度,温度的值是随机生成的,介于华氏0度到99度之间,这也是适合大多数美国城市的温度区间);
  • ❏ TempSubscriber,它监听这些温度报告事件,并打印输出某个城市的温度监控器返回的温度Stream。

表示当前汇报温度的Java Bean

会在Subscriber请求温度报告时返回对应的数据Subscription接口实现,向Subscriber发送TempInfo Stream

创建Subscriber,每当它从Subscription拿到一个新元素,就打印输出温度,并继续请求新的数据

创建Publisher并向其订阅TempSubscriber

getTemperatures方法返回的是一个Lambda表达式,接受一个Subscriber对象作为参数,并调用它的onSubscribe方法。调用onSubscribe方法时,向其传入的参数是一个新创建的TempSubscription实例。由于这个Lambda表达式的签名与Publisher函数式接口中唯一的抽象方法保持一致,因此Java编译器会自动地将该Lambda表达式转换为Publisher对象。main方法为纽约的温度创建了一个Publisher,接着向它注册了一个新的TempSubscriber类实例。

温度数据所构成的Stream会被TempInfo工厂方法随机抛出的异常中断,这个问题被隐藏了

可行的解决方案是在TempSubscription中添加Executor,使用它通过另外一个线程向TempSubscriber发送新的元素。

为TempSubscription添加Executor

使用Processor转换数据

Processor身兼两职,它既是一个Subscriber也是一个Publisher。实际上,经常将它注册到一个Publisher上,接收并转换完数据后,再把这些数据重新发布出去。这里举一个实际的例子,要求是实现一个Processor,它注册到一个发布以华氏温度表示温度数据的Publisher上,需要将接收到的数据转换为摄氏温度并重新发布出去

将温度由华氏温度转换为摄氏温度的Processor

onNext是TempProcessor类中唯一一个包含业务逻辑的方法,它在将温度由华氏温度转换为摄氏温度后将其重新发布出去。所有其他实现Subscriber接口的方法都仅仅做了个二传手,把接收到的信号原封不动地传递给上游的Subscriber,Publisher的subscribe方法将上游的Subscriber注册到Processor中。

创建Publisher并向其注册TempSubscriber

:::info
New York : 10

    New York : -12New York : 23Error!

:::

构成Flow API思想核心的是它基于“发布-订阅”协议的异步流处理模型,

为什么Java并未提供Flow API的实现

Java提供的List接口已经被非常多的类实现了,其中包括ArrayList。更确切地说(这部分内容一般用户可能没那么关心)类ArrayList继承自抽象类AbstractList,而后者实现了LIst接口。与此相反,Java 9声明了Publisher接口,可是没有提供任何实现,这也是你只能定义自己版本实现的原因(当然,实现这些接口也能帮助你更好地学习它们,不过这并非其初衷)。

反应式流有多个Java库的实现版本(譬如Akka和RxJava)

最初这些库都是独立开发的,虽然它们都基于“发布-订阅”的思想实现了反应式编程,但是使用的术语和API是迥异的。

在Java 9标准化的过程中,这些库也在不断演进,最终它们都实现了java.util.concurrent.Flow接口,不再是仅仅实现了反应式的概念。标准化使得不同库之间互通和调用成为可能。

构建一个反应式流的实现相当复杂,因此大多数用户都倾向于使用现有的库。大多数实现接口的类库都会提供更加丰富的功能,而不是仅限于接口的最小实现集。

使用反应式库RxJava

RxJava是支持反应式编程的首批Java语言库之一。它诞生于Netflix,是对微软.Net环境中反应式扩展(reactive extension,Rx)项目的迁移。

/**使用Java语言时,如果你使用了一个第三方的库,很容易就能识别,因为你需要使用import导入第三方库。
举例来说,为了使用Publisher,你导入了Java的Flow接口,就需要使用下面这行声明:
**/
import java.lang.concurrent.Flow.*;
/**
不过如果你想要用Observable版本的Publisher,那么你还需要像下面这行代码那样,
导入对应的实现类。
**/
import io.reactivex.Observable;

有必要特别强调一个架构问题:优秀的系统架构通常会避免把仅在某个局部使用的细节概念暴露给整个系统。因此,一种推荐的做法是只在需要Observable的额外结构时使用Observable,否则就应该继续使用它的Publisher接口。注意,使用List接口时,你毫无疑问也应该遵循这一原则。有些时候即便你知道一个方法接受一个ArrayList类型的参数,为了避免暴露太多实现的细节或者限制未来潜在的变更,你可以将该参数的类型设置为List。

代码的设计带来了更多的灵活性,未来如何你需要变更实现,将参数由ArrayList替换成LinkedList,代码则不需要做大量的变更。

阅读RxJava文档后,会发现其中一个类是io.reactivex.Flowable类,提供了Java 9 Flow中基于拉模式的背压特性(通过request方式)。背压可以防止Subscriber被Publisher快速生成的大量数据压垮。

另一个类是RxJava最初始的版本,即io.reactivex.Observable的Publisher,它不支持背压。

这个类更容易使用,同时也更适用于用户接口事件(譬如鼠标移动)。这些事件都是不适合进行背压的流(想象一下,你怎么能让用户慢些移动鼠标,或者停止移动鼠标!)。

出于上述考虑,RxJava为处理通用流事件提供了这两个版本的类实现。

RxJava建议当你的流元素不超过一千个,或者你正处理的是基于图形用户界面的事件流,譬如鼠标移动或者触摸这些无法背压或不常发生的事件时,使用非背压版本的Observable

创建和使用Observable

Observable和Flowable类都提供了非常方便的工厂方法,使用它们你可以创建多种类型的反应式流(因为Observable和Flowable都实现了Publisher接口,所以这些工厂方法能够发布反应式流)。

//最简单的方式创建Observable,那么可以像下面这样通过创建预定数量元素的方式实现
Observable<String> strings = Observable.just( "first", "second" );

这里的just()工厂方法可以将一个或多个元素转换为Observable,这些Observable在适当的时候又会释放出对应的元素。

Observable的Subscriber会依次接收到onNext("first")onNext("second")以及onComplete()消息。

另一个比较常见的是Observable工厂方法,尤其是你的应用需要与用户执行实时交互的时候,它会按照固定的时间间隔发出事件

Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);
/**
interval工厂方法返回一个名为onePerSec的Observable,
它会以你选定的一个固定时间间隔(本例的时间间隔是一秒钟)​,
发送一个由long类型值组成的无限递增序列,这个序列由0开始计数。
接着,你可以用onePerSec作为另一个Observable的基础,每隔一秒反馈一次指定城市的温度报告。
**/

在RxJava中,Observable扮演了Flow API中Publisher的角色,因此Observable的行为与Flow中Subscriber接口的行为也很相似。

RxJava的Observable接口声明了Java 9 Subscriber同样的方法。

唯一的不同是,它的onSubscribe方法需要一个Disposable参数,而不是一个Subscription。

Observable不支持背压,因此它也没有构造Subscription的request方法。

//Observable接口的完整定义
public interface Observer<T> {void onSubscribe(Disposable d);void onNext(T t);void onError(Throwable t);void onComplete();
}

RxJava的API比Java 9的Flow API更灵活(提供了更多的重载变量)。譬如,订阅一个Observable对象时,你可以直接传递一个Lambda表达式给它,只提供onNext方法的签名,完全忽略其他三个方法都可以。

可以使用一个仅用接收事件的Consumer实现onNext方法的Observer去订阅一个Observable对象,onNext方法负责处理接收到的事件,其他方法都使用默认值,即事件处理完成或者发生异常时都不做操作。凭借这个特性,你只需要编写一行代码就可以订阅Observable onePerSec,打印输出纽约每秒钟的温度情况。

onePerSec.subscribe(i -> System.out.println(TempInfo.fetch( "New York" )));

onePerSec Observable每秒钟发出一个事件。接收到这条消息后,Subscriber就会尝试获取纽约的温度并打印输出。然而,如果把这条语句放到main方法中,并试图去执行它的话,不会看到任何输出,因为Observable执行每秒钟发布一条事件的线程是RxJava的计算线程池中的线程,它们都是守护线程。

main程序执行完就立刻退出了,结果导致守护线程还没产生任何输出就被终止了。

更好的解决方案是用blockingSubscribe方法调用当前线程(在这个例子中就是main函数所在的线程)的回调函数。使用blockingSubscribe是最合适的途径了。

onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch( "New York" ))
);

Observer实现只有正常的处理逻辑,不包含任何出错和失效的管理,譬如onError,因此一旦发生失效,这些错误就会作为未捕获的异常直接暴露给用户。

希望不仅要有出错管理,还要统计已有的数据。

要的不再是实时打印输出温度数据,而是为用户提供一个工厂方法,每秒返回一个包含温度数据的Observable对象,该对象在完成工作退出之前最多返回五次温度数据。

可以通过名为create的工厂方法借助Lambda创建Observable对象,该方法接受另一个Observable作为参数,返回值为void创建一个每秒一次返回温度的Observable对象

通过向一个函数传递ObservableEmiter,并向其发送对应的事件,创建了返回的Observable。RxJava的ObservableEmitter接口继承自RxJava的基础类Emitter。你可以把Emitter想象成不带onSubscribe方法的Observer

public interface Emitter<T> {void onNext(T t);void onError(Throwable t);void onComplete();
}

ObservableEmitter提供了更多的方法,用于替Emitter设置新的Disposable,或者检查某个序列是否已经被下游处理过了。

内部订阅一个Observable,就像onePerSec那样,以每隔一秒的频率发布一个无限递增的序列。在订阅函数(当然你还需要向订阅方法传递一个参数)的内部,你首先需要借助ObservableEmitter接口提供的isDisposed方法检查之前创建的Observer是否已经被处理了(如果上一个迭代中发生了错误,就会遭遇这种情况)。如果温度已经收集了五次,这段代码就会终止Observer对象,并关闭对应的流;否则就发送请求城市最新的温度报告给Observer对象。这段代码被包含在一个try/catch语句块中。如果获取温度时发生了错误或者异常,错误就会传递给Observer对象。

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class TempObserver implements Observer<TempInfo> {@Overridepublic void onComplete() {System.out.println( "Done! " );}@Overridepublic void onError( Throwable throwable ) {System.out.println( "Got problem: " + throwable.getMessage() );}@Overridepublic void onSubscribe( Disposable disposable ) {}@Overridepublic void onNext( TempInfo tempInfo ) {System.out.println( tempInfo );}
}
/**
这个Observer与之前TempSubscriber很相似(TempSubscriber实现了Java 9的Flow.Subscriber)​,
但是这里做了进一步的简化。因为RxJava的Observable不支持背压,
所以处理完发布的事件后你不需要再调用request()方法请求更多的元素了。
**/

打印输出纽约温度的main类

:::info
假设这一次,温度获取过程中没有发生任何的错误,main函数每隔一秒打印输出一条温度记录,五次之后Observable发出了onComplete信号。这种情况下,你看到的输出可能是下面这样的:

      New York : 69New York : 26New York : 85New York : 94New York : 29Done!

:::

转换及整合多个Observable

与原生的Java 9 Flow API比较起来,RxJava及其他的三方反应式库主要的优势之一是它们往往提供了更加丰富的函数集,可以更灵活地对流进行整合、创建以及过滤操作。一个流可以作为另一个流的输入。

Java 9的Flow.Processor,它可以对流中的数据进行转换,譬如将温度由华氏温度转换为摄氏温度。还可以过滤流中的数据,找出关心的元素创建一个新的流,然后使用特定的映射函数对这些元素进行转换(这些都可以通过Flow.Processor实现),甚至可以通过多种方式合并或整合两个流(这些目前还无法通过Flow.Processor实现)。

流的转换与合并函数非常复杂。RxJava文档对它提供的mergeDelayError函数的描述:

:::info
对向一个Observable发送多个Observable的Observable进行扁平处理。它允许一个Observer从所有的源Observable接收多个成功发送的元素,并且该操作不会被某一个Observable发送失败所影响,同时你还可以控制这些Observable对象上并发的订阅数目。

:::

反应式流社区决定以一种可视化的方式描述这些函数的行为。这种可视化的方式叫作弹珠图。

通过水平线上的几何图形表示反应式流中元素的临时顺序;通过特殊符号表示错误以及事件完成的信号。图中的方框表示命名操作是如何转换那些元素或者整合多个流的。

弹珠图示例——文档化典型反应式库的操作

对RxJava库中所有的函数进行可视化表示。对map(转换由Observable发布的元素)和merge(将由两个或多个Observable发布的事件整合在一起)的可视化。函数map和merge对应的弹珠图

如何使用map和merge改进前一节中开发的RxJava示例,甚至为它增加新的特性。map能提供更加精准的控制,譬如在执行从华氏温度到摄氏温度的转换时,用map就比直接使用Flow API的Processor要灵活得多

//使用map处理Observable实现从华氏温度到摄氏温度的转换
public static Observable<TempInfo> getCelsiusTemperature(String town) {
return getTemperature( town )
.map( temp -> new TempInfo( temp.getTown(),(temp.getTemp() -32) * 5 / 9) );
}

简短的方法中getTemperature方法返回的Observable对象,返回一个新的Observable,这个Observable以每秒一个的频率,将Observable返回的温度由华氏温度转换为摄氏温度并发布出去

过滤出那些值为负的温度

Observable类的filter方法接受一个Predicate做参数,返回一个新的Observable,这个新的Observable只发布符合Predicate定义要求的元素。假设你需要开发一个预警系统,在有结冰的危险时,提醒用户做好相应的预防措施。

public static Observable<TempInfo> getNegativeTemperature(String town) {
return getCelsiusTemperature( town )
.filter( temp -> temp.getTemp() < 0 );
}
//对上述方法进行泛化,允许用户设定城市时,既可以指定单一城市,也可以指定由多个城市组成的集合,
//但返回的依旧是发布温度数据的Observable对象。
//使用merge合并多个城市的温度
public static Observable<TempInfo> getCelsiusTemperatures(String... towns) {return Observable.merge(Arrays.stream(towns).map(TempObservable::getCelsiusTemperature).collect(toList()));
}
/**
这个方法中,接受查询城市的变量是一个变长参数,你可以指定一个城市的集合。
这个变长参数会被转换为一个字符串流,接着每个字符串会被传递给getCelsius-Temperature方法
通过这种方式,每个城市都被转换成了以每秒一次频率发布温度数据的Observable对象。
最终,这个Observable流被收集到了一个列表中,列表被传递给了Observable类自身的静态工厂方法merge。
该方法迭代遍历访问每一个Observable元素,并整合其输出,让它们的行为表现得就像一个单一的Observable对象一样。
**/
//最终的这个Observable会发布由Iterable传递的所有Observable对象发布的事件,并保持其原有的顺序。
public class Main {public static void main(String[] args) {Observable<TempInfo> observable = getCelsiusTemperatures("New York", "Chicago", "San Francisco" );observable.blockingSubscribe( new TempObserver() );}
}
/**New York : 21Chicago : 6San Francisco : -15New York : -3Chicago : 12San Francisco : 5Got problem: Error!
**/

小结

  • ❏ 反应式编程思想的正式提出是在反应式宣言中,它指出反应式软件必须具备四个相互关联的特性:响应性、韧性、弹性以及消息驱动。
  • ❏ 反应式编程原则通过微调,既可以用于构建单一应用,也可以用于设计反应式系统,整合多个应用。
  • ❏ 反应式应用基于反应式流承载的一个或多个事件流的异步处理。由于反应式流在开发反应式应用中的角色如此重要,Netflix、Pivotal、Lightbend以及Red Hat等多家公司成立了联盟,致力于推动反应式概念的标准化,试图打破不同反应式库之间的互操作性障碍。
  • ❏ 由于反应式流异步处理的天然特征,它们往往都自带背压机制。背压可以避免处理速度慢的消费方被高速的消息生产方压垮。❏ 反应式设计及其标准流程已经正式引入了Java。Java 9的Flow API定义了四个核心接口:Publisher、Subscriber、Subscription以及Processor。
  • ❏ 大多数情况下,这些接口不需要开发者直接去实现,它们主要作为实现反应式语义的第三方库的通用接口。
  • ❏ 应用最广泛的反应式库是RxJava,它(除了Java 9 Flow API中定义的那些基本特性之外)额外提供了很多便利而强大的操作。譬如,使用它提供的操作,你可以很便利地对单一反应式流中的元素进行转换和过滤,还可以整合和聚集多个流。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/869557.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

goal vs objective vs target

goal 680 objective 2421 target 1284GOAL vs OBJECTIVE left 4WORD 1: GOAL 过滤200WORD W1 W2SCORED 1423 1 He has scored a further five goals in the Spanish Supercup and the Champions League.他在西班牙超级杯和冠军联赛中又打进了五个进球。 scored Barcas fourth…

大模型备案流程-简易易懂

大模型备案除了资料撰写难度高外,难点还在于各省没有统一标准。备案流程、资料要求、考察重点都会有些许差异。不过,各省的大体申报流程都如下文所示(各省主要差异点我会标出,具体内容可以一起沟通交流): 一、备案申请 报请申请者所在省份/直辖市/自治区网信:向企业注册地…

KingbaseES RAC集群案例之---jmeter压测

KingbaseES RAC、jmeter案例说明: 通过jmeter压测,测试KingbaseES RAC集群负载均衡功能。 数据库版本: test=# select version();version ---------------------KingbaseES V008R006 (1 row)测试架构:一、jmeter版本 1、系统jiava版本 [root@node203 ~]# java -version ope…

{LOJ #6041. 「雅礼集训 2017 Day7」事情的相似度 题解

\(\text{LOJ \#6041. 「雅礼集训 2017 Day7」事情的相似度 题解}\) 解法一 由 parent 树的性质得到,前缀 \(s_i,s_j\) 的最长公共后缀实质上就是 \(i,j\) 在 SAM 中的 \(\operatorname{LCA}\) 在 SAM 中的 \(\operatorname{len}\)。让我们考虑如何处理 \((l,r)\) 区间内的询问…

解决Hyper-V保留端口导致各种端口占用报错的问题

0.有时候在本地启用一个服务比如MySQL服务,或者在启用IDEA的调试的时候,或者在本地启用一个监听端口的时候可能会出现监听失败的情况,经过查找之后会发现并没有应用占用相应的端口。 1.经过查找发现其实是在启用了Hyper-V之后系统会保留一些端口,这些端口如果包含了你应用要…

D. Madoka and The Corruption Scheme -- (贪心,组合数学,构造)

题目链接:Problem - D - Codeforces 题目大意: 一共n轮比赛,有\(2^n\)个参赛者,第\(i\)轮有\(2^{n - i}\) 场比赛,Madoka能安排第一局的比赛,她想让最后的赢家编号更小,主办方最多有k次操作,能修改任意每一场比赛的获胜情况,可以让最终赢家编号更 大,求Madoka在主办方…

PHP语法基础

PHP语法基础php文档拓展名是.phpphp文件通常包含html标签以及一些php脚本运行代码 ,注意:html js css可以在php文件执行但是,php不能在html js css在php文件执行php语法用;结尾 <!DOCTYPE html> <html> <body> <h1>我的第一张php页面><h1>…

本地打包docker images并上传到服务器.250115

情景: 服务器docker Pull 拉不下来 docker pull easzlab/kubeasz-k8s-bin:v1.31.2 Get "https://registry-1.docker.io/v2/": net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers) 2025-01-14 17:06:35 [ez…

包豪斯学院

包豪斯学院(Bauhaus)是20世纪最具影响力的艺术与设计学府之一,创立于1919年,由建筑师沃尔特格罗皮乌斯(Walter Gropius)在德国魏玛建立。作为现代主义设计的先锋,包豪斯不仅在建筑、工艺、艺术和设计等领域开创了新局面,其设计理念更是深刻影响了全球的艺术与工业生产方…

主机PHP版本过低导致网页无法正常运行的解决办法

问题描述: 用户发现其主机上的PHP版本过低,导致某些功能无法正常使用,影响了网站的整体性能。此外,用户询问是否可以通过升级主机获得免费域名赠品,以及数据库空间不足的问题。 解决方案: 针对您遇到的主机PHP版本过低的问题,这里提供一些解决方案和建议,帮助您顺利升级…

如何解决网站在多台电脑上打开速度慢的问题

问题描述: 用户反馈,其家庭和单位的电脑在访问某个特定域名时速度非常慢,但手机端访问速度正常。此外,用户还提到服务器存在大量漏洞,担心网站安全问题,并询问如何处理这些漏洞。 解决方案: 针对您提到的家庭和单位电脑访问域名速度慢的问题,我们首先需要排查以下几个方…