初见-响应式编程-002

 🤗 ApiHug × {Postman|Swagger|Api...} = 快↑ 准√ 省↓

  1. GitHub - apihug/apihug.com: All abou the Apihug   
  2. apihug.com: 有爱,有温度,有质量,有信任
  3. ApiHug - API design Copilot - IntelliJ IDEs Plugin | Marketplace

#Reactive

The Reactive Manifestoopen in new window:

Systems built as Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback.

  1. flexible,
  2. loosely-coupled
  3. scalable

  1. Responsive, 响应时间, 服务质量
  2. Resilient, 容错, 恢复能力
  3. Elastic, 弹性,动态扩容
  4. Message Driven, 事件驱动,低耦合

#Reactor

Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s). — https://en.wikipedia.org/wiki/Reactive_programmingopen in new window

最初 微软创建了 .NET 里的 Reactive Extensions (Rx); RxJava 实现了 JVM 上的 Reactive编程模式, 最终 JAVA 9 融入了 Flowopen in new window -- java.util.concurrent.Flowopen in new window

在 OO 编程里面 reactive 常被当做 Observer 观察者设计模式, 当然你可把 Reactive Stream 和你熟悉的 Iterator 设计模式做对比;两种实现里面都有Iterable-Iterator 两个概念, 主要的不一样在, Iterator 是一种 拉 pull 模式, 而 reactive 是一种push 推模式。

在两个流程中都有 next(), 在 reactive stream 更类似于 Publisher-Subscriber 模式,由Publishr 来控制新到的value 给 Subscriber; push 是 reactive 里面非常重要的一面;

程序实现着注重收到 value 后的计算逻辑 Operation, 而不是整个控制流程。

整个流程里面喂入数据 push 自然是整个响应流程里面最核心的流程, onNext 用来完成此动作, 还包含 错误 onErro() 和最终的结束处理 onComplete(), 整个流程可以被抽象为:

onNext x 0..N [onError | onComplete]

整个流程的处理可以说非常灵活, 可以有 0 个, 1个, N 个, 或者无限多的数据, 比如一个定时器。

回到问题的本质, WHY 我们为什么需要 异步的 reactive 模式呢?

#Asynchronicity 能解决

并行, 多核已经是常态来增加吞吐量和响应时间。

多线程用来最大化利用资源; 但是多线程异步可以解决问题通知, 带来了很大的挑战。 JVM 解决此引入两个概念:

  1. callback, 异步方法用来通知结果, 一般是一个内部类,或者一个 lamdba 表达式
  2. future,异步调用立即返回一个 Future<T>, 但是结果 T 尚不能立即获得, 结果获得后才能通过 poll 获得。 ExecutorService 跑 Callable<T> 时返回 Future。
#Callback地狱

callback 贯穿整个链路的调用过程:

userService.getFavorites(userId, new Callback<List<String>>() { public void onSuccess(List<String> list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List<Favorite>>() {public void onSuccess(List<Favorite> list) { UiUtils.submitOnUiThread(() -> { list.stream().limit(5).forEach(uiList::show); });}public void onError(Throwable error) { UiUtils.errorPopup(error);}});} else {list.stream() .limit(5).forEach(favId -> favoriteService.getDetails(favId, new Callback<Favorite>() {public void onSuccess(Favorite details) {UiUtils.submitOnUiThread(() -> uiList.show(details));}public void onError(Throwable error) {UiUtils.errorPopup(error);}}));}}public void onError(Throwable error) {UiUtils.errorPopup(error);}
});

如果换成 reactor:

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 

reactor 不仅仅让整个流程更精简, 通知提供服务质量控制(类似熔断), 比如我们保证整个服务质量在 800ms内返回,超时后从 fallback cache 或者其他获取:

userService.getFavorites(userId).timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()).take(5).publishOn(UiUtils.uiThreadScheduler()).subscribe(uiList::show, UiUtils::errorPopup);
#Future

Future 避免了回调地狱, 但是依然不太容易进行组装, 虽然在 Java 8 里面引入了 CompletableFuture, 编制多个 Future 在一起虽然可以操作,但是Future 还是有其他问题:

  1. Future 的 get() 依然是阻塞的
  2. 不支持延迟计算
  3. 缺乏对多结果的支持, 更好的错误处理

这样一个业务场景; 从一个 ID 列表, 去查询他们的name + 统计, 所有的都是异步 CompletableFuture 例子:

CompletableFuture<List<String>> ids = ifhIds(); CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture<String>> zip =l.stream().map(i -> { CompletableFuture<String> nameTask = ifhName(i); CompletableFuture<Integer> statTask = ifhStat(i); return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); });List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); return allDone.thenApply(v -> combinationList.stream().map(CompletableFuture::join) .collect(Collectors.toList()));});List<String> results = result.join(); 
assertThat(results).contains("Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106",

reactor 更紧凑解决方案, 更精炼容易理解:

Flux<String> ids = ifhrIds(); Flux<String> combinations =ids.flatMap(id -> { Mono<String> nameTask = ifhrName(id); Mono<Integer> statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat);});Mono<List<String>> result = combinations.collectList(); List<String> results = result.block(); 
assertThat(results).containsExactly( "Name NameJoe has stats 103","Name NameBart has stats 104","Name NameHenry has stats 105","Name NameNicole has stats 106","Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

#响应编程

除非上面我们看到, 让编码更清晰直观, reactor 等还在这些方面上花了很多心思:

  1. Composability and readability; 组件化, 可读性好。
  2. Data as a flow manipulated with a rich vocabulary of operators, 数据管道铺好, 自由搭配运算逻辑。
  3. Nothing happens until you subscribe, 延迟计算 subscribe 触发计算。
  4. Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high, 背压控制, consumer 和 producer配合,消费端和生产端。
  5. High level but high value abstraction that is concurrency-agnostic; 润物细无声,抽象的让你感知不到并发。

Composability and Readability 包括可以自由的组建编制任务, 任务之间的依赖关系, 上下关系, 或者同步运行的 fork-join 风格, 高度抽象层使用异步任务。

流水线一样的操作, Reactor 既是传送带也是工作站, 原料从 Publisher 最终成品发送到消费端 Subscriber

Operators, 相当于流水线上的工作站, 整个流水线上就是上一个 Publisher 的产物, 然后包装发送到下一个 Publisher, 最终到一个 Subsccriber 里面。

延迟计算, 在Reactor 里当你写一个 Publisher 链条, 数据默认是不会启动起来, 你只是创建了一个异步处理的抽象流程(Spark 里的RDD, 或者像一个流程的 DSL)。

当你触发 subscribing 时候, 将 Publisher 和一个 Subscriber 绑定, 同事触发数据流, 流入到整个链中, 内部通过 Subscriber 触发一个 request 信号传递到上游, 最终到源的 Publisher

背压, 下游将信号传递给上游是用来实现 backpressure背压的一种方式, 依然用流水线的比方, 当下游的工作站赶不上上游的速度的时候需要反馈一个信号到上游去。

A subscriber can work in unbounded mode and let the source push all the data at its fastest achievable rate or it can use the request mechanism to signal the source that it is ready to process at most n elements.

可以 push-pull 方式混合, 下游容量自由控制上游的推送速度, 或者懒式的拉取。

Cold流和Hot流

  1. Cold流不论订阅者在何时订阅该数据流,总是能收到数据流中产生的全部消息。
  2. Hot流则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

冷例子:

@Testpublic void cold_example() {Flux<String> source =Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")).map(String::toUpperCase);source.subscribe(d -> System.out.println("Subscriber 1: " + d));source.subscribe(d -> System.out.println("Subscriber 2: " + d));
}

输出结果:

Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE

所有的 subscriber 都能得到结果。

热例子:

@Testpublic void hot_example() {Sinks.Many<String> hotSource = Sinks.unsafe().many().multicast().directBestEffort();Flux<String> hotFlux = hotSource.asFlux().map(String::toUpperCase);hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));hotSource.emitNext("blue", FAIL_FAST);hotSource.tryEmitNext("green").orThrow();hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));hotSource.emitNext("orange", FAIL_FAST);hotSource.emitNext("purple", FAIL_FAST);hotSource.emitComplete(FAIL_FAST);}

得到结果:

Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE

#结论

从 reactive 宣言我们看到响应式编程的 '理想', 初探 reactor, 我们看到 reactor 的强大表达力, 这些还只是管中窥豹, 更多的等待我们下面章节去探索和挖掘。

测试项目 Reactor_001_testopen in new window

#参考

  1. The Reactive Manifestoopen in new window
  2. reactive-streams-jvmopen in new window
  3. reactive-streamsopen in new window
  4. java 9 flowopen in new window
  5. Cold流和Hot流open in new window
  6. Flux 详实的流程图open in new window
  7. Mono 详实的流程图

我们

api-hug-contact

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/643847.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

力扣HOT100 - 108. 将有序数组转换为二叉搜索树

解题思路&#xff1a; 二叉搜索树一般使用中序遍历 class Solution {public TreeNode sortedArrayToBST(int[] nums) {return helper(nums,0,nums.length-1);}public TreeNode helper(int[] nums,int left,int right){if(left>right) return null;//确定根节点//总是选择中…

链表操作III

看这篇文章之前&#xff0c;可以先看看链表操作I和链表操作II。而这篇文章主要是想说明两道关于链表环的问题。 环形链表 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则…

ffmpeg支持MP3编码的方法

目录 现象 解决办法 如果有编译包没有链接上的情况 现象 解决办法 在ffmpeg安装包目录下 &#xff0c;通过./configure --list-encoders 和 ./configure --list-decoders 命令可以看到&#xff0c;ffmpeg只支持mp3解码&#xff0c;但是不支持mp3编码。 上网查寻后发现&…

【C++初阶】List使用特性及其模拟实现

1. list的介绍及使用 1.1 list的介绍 1. list是可以在常数范围内在任意位置进行插入和删除的序列式容器&#xff0c;并且该容器可以前后双向迭代。 2. list的底层是双向链表结构&#xff0c;双向链表中每个元素存储在互不相关的独立节点中&#xff0c;在节点中通过指针指向其前…

【PHP开发工程师详细讲解分析】——网站注册账号(头像的上传操作),让自己喜欢的头像更换畅通无阻

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

设计模式-00 设计模式简介之几大原则

设计模式-00 设计模式简介之几大原则 本专栏主要分析自己学习设计模式相关的浅解&#xff0c;并运用modern cpp 来是实现&#xff0c;描述相关设计模式。 通过编写代码&#xff0c;深入理解设计模式精髓&#xff0c;并且很好的帮助自己掌握设计模式&#xff0c;顺便巩固自己的c…

鸿蒙OpenHarmony【小型系统运行案例】 (基于Hi3516开发板)

运行 启动系统 在完成Hi3516DV300的烧录后&#xff0c;还需要设置BootLoader引导程序&#xff0c;才能运行OpenHarmony系统。 在Hi3516DV300任务中&#xff0c;单击Configure bootloader&#xff08;Boot OS&#xff09;进行配置即可。 说明&#xff1a; DevEco Device Tool…

02 IO口的操作

文章目录 前言一、IO的概念1.IO接口2.IO端口 二、CPU和外设进行数据传输的方法1.程序控制方式1.1 无条件1.2 查询方式 2.中断方式3.DMA方式 一、方法介绍和代码编写1.前置知识2.程序方式1.1 无条件方式1.1.1 打开对应的GPIO口1.1.2 初始化对应的GPIO引脚1.1.2.1 推挽输出1.1.2.…

Docker常用命令(镜像、容器、网络)

一、镜像 1.1 存出镜像 将镜像保存成为本地文件 格式&#xff1a;docker save -o 存储文件名 存储的镜像docker save -o nginx nginx:latest 1.2 载入镜像 将镜像文件导入到镜像库中 格式&#xff1a;docker load < 存出的文件或docker load -i 存出的文件…

【大语言模型LLM】- Meta开源推出的新一代大语言模型 Llama 3

&#x1f525;博客主页&#xff1a;西瓜WiFi &#x1f3a5;系列专栏&#xff1a;《大语言模型》 很多非常有趣的模型&#xff0c;值得收藏&#xff0c;满足大家的收集癖&#xff01; 如果觉得有用&#xff0c;请三连&#x1f44d;⭐❤️&#xff0c;谢谢&#xff01; 长期不…

c++的策略模式,就是多态

一、定义&#xff1a; 策略模式定义了一系列的算法&#xff0c;并将每一个算法封装起来&#xff0c;而且使它们还可以相互替换。 策略模式让算法独立于使用它的客户而独立变化。 二&#xff0c;核心 抽象策略&#xff08;抽象基类&#xff09;&#xff08;Strategy&#xff09…

【01-机器学习入门:理解Scikit-learn与Python的关系】

文章目录 前言Python与机器学习Scikit-learn简介Scikit-learn与Python的关系使用Scikit-learn进行机器学习结语前言 在当今的数据科学和人工智能领域,机器学习已经成为了一个不可或缺的组成部分。而对于那些刚刚踏入这一领域的新手来说,理解机器学习的基本概念和找到合适的工…