Java学习笔记-day05-响应式编程初探-自定义实现Reactive Streams规范

最近在学响应式编程,这里先记录下,响应式编程的一些基础内容

1.名词解释

Reactive Streams、Reactor、WebFlux以及响应式编程之间存在密切的关系,它们共同构成了在Java生态系统中处理异步和响应式编程的一系列工具和框架。

  1. Reactive Streams:

    • Reactive Streams 是一个规范,定义了一组接口和协议,用于处理异步数据流的背压。它包括发布者(Publisher)、订阅者(Subscriber)、订阅(Subscription)和处理器(Processor)等接口。
    • Reactive Streams 规范的目标是提供一种标准的方式来处理异步数据流,解决背压问题。Java标准库从Java 9开始提供了 java.util.concurrent.Flow 类,实现了Reactive Streams规范。
  2. Reactor:

    • Reactor 是一个基于Reactive Streams规范的响应式编程框架。它提供了一组用于构建异步、事件驱动、响应式应用程序的工具和库。Reactor 的核心是 Flux(表示一个包含零到多个元素的异步序列)和 Mono(表示一个包含零或一个元素的异步序列)。
    • Reactor 通过提供响应式的操作符,如mapfilterflatMap等,使得开发者能够方便地进行数据流的转换和处理。
  3. WebFlux:

    • WebFlux 是Spring Framework 5引入的响应式编程支持。它构建在 Reactor 之上,提供了一套用于构建异步、非阻塞、响应式的Web应用程序的API。WebFlux支持使用Reactive Streams处理HTTP请求和响应。
    • Spring WebFlux 可以用于构建反应式的RESTful服务,支持使用注解的方式定义路由和处理器函数。
  4. 响应式编程:

    • 响应式编程是一种编程范式,强调数据流和变化的传播。在这个范式中,数据源产生数据并通知观察者,观察者相应地处理这些数据。这种方式更容易处理异步操作和事件。
    • 在Java中,响应式编程通常涉及到使用类似于Reactor或RxJava的库,这些库提供了响应式的操作符和工具。

综上所述,Reactive Streams 提供了规范,Reactor 是一个实现了该规范的响应式编程框架,而WebFlux是Spring对于响应式编程的支持。它们共同致力于构建异步、非阻塞、响应式的应用程序。响应式编程则是一种更广义的编程范式,与Reactive Streams和Reactor等具体实现密切相关。

2.Reactive Streams 规范

2.1.Reactive Streams规范定义

java.util.concurrent.Flow 类中,定义了Reactive Streams规范
在这里插入图片描述

  • Publisher(发布者):负责生成数据流,并向订阅者发送数据。
  • Subscriber(订阅者):表示数据流的消费者,它订阅一个或多个发布者,并接收数据。
  • Subscription(订阅):表示订阅关系的接口,用于控制数据流的请求和取消。
  • Processor(处理器):充当发布者和订阅者的中间组件,可以对数据进行转换和处理。

2.2.API方法

1. Publisher(发布者):
interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
}
  • subscribe(Subscriber<? super T> subscriber) 用于订阅数据流。当订阅者调用这个方法时,发布者将建立与订阅者的订阅关系,并开始推送数据。
2. Subscriber(订阅者):
interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
}
  • onSubscribe(Subscription subscription) 在订阅关系建立时调用。通过这个方法,订阅者可以持有 Subscription 对象,以便后续请求数据和取消订阅。

  • onNext(T item) 在接收到新元素时调用。订阅者通过这个方法处理收到的数据。

  • onError(Throwable throwable) 在数据流中出现错误时调用。订阅者通过这个方法处理错误情况。

  • onComplete() 在数据流完成时调用。通知订阅者数据流结束,不再有新的元素。

3. Subscription(订阅):
interface Subscription {void request(long n);void cancel();
}
  • request(long n) 用于请求订阅者处理指定数量的元素。订阅者通过这个方法告知发布者它可以处理多少个元素。

  • cancel() 用于取消订阅关系。当订阅者不再需要接收数据时,调用此方法取消订阅。

4. Processor(处理器):
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor 接口是 SubscriberPublisher 的组合,表示一个中间处理组件,可以同时充当订阅者和发布者的角色。

  • Subscriber 部分的方法:onSubscribe(Subscription subscription), onNext(T item), onError(Throwable throwable), onComplete()

  • Publisher 部分的方法:subscribe(Subscriber<? super R> subscriber)。表示 Processor 可以被其他订阅者订阅。

5.泛型T

泛型T即为数据流

这些方法共同构成 Reactive Streams 协议,定义了发布者和订阅者之间的协作方式,以及订阅者如何处理数据流。在实际的使用中,这些方法的实现通常需要考虑异步处理、背压机制等方面,以确保响应式编程的目标得以实现。

2.3.工作流程

在 Reactive Streams 中,PublisherSubscriberSubscriptionProcessor 之间的协作流程如下:

有时间再补流程图
在这里插入图片描述

  1. Publisher(发布者):

    • Publisher 是异步产生数据流的组件,它通过 subscribe 方法允许订阅者订阅。subscribe 方法会接收一个 Subscriber 对象作为参数。
    • Publisher 有新数据准备好时,通过调用订阅者的 onNext 方法将数据推送给订阅者。
    interface Publisher<T> {void subscribe(Subscriber<? super T> subscriber);
    }
    
  2. Subscriber(订阅者):

    • Subscriber 是数据流的消费者,通过实现 Subscriber 接口来接收来自发布者的数据。订阅者通过调用 subscription.request(n) 请求一定数量的数据,处理数据时通过 onNext 方法接收元素。
    • 当订阅者无法处理更多的元素时,可以调用 subscription.cancel() 来取消订阅。
    interface Subscriber<T> {void onSubscribe(Subscription subscription);void onNext(T item);void onError(Throwable throwable);void onComplete();
    }
    
  3. Subscription(订阅):

    • Subscription 表示订阅关系,它在 onSubscribe 方法中被传递给订阅者。通过 Subscription,订阅者可以请求数据和取消订阅。
    • 订阅者通过 request(long n) 方法请求处理 n 个元素,通过 cancel() 方法取消订阅。
    interface Subscription {void request(long n);void cancel();
    }
    
  4. Processor(处理器):

    • Processor 是一个同时实现了 PublisherSubscriber 接口的中间组件,可以作为数据流的处理器,对数据进行转换和处理。
    • Processor 既能接收数据,也能发布数据。它将 onNextonErroronComplete 方法委托给下游的订阅者,并将数据推送给上游的发布者。
    interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

这些接口一起构成了 Reactive Streams 的基本协议。发布者产生数据,订阅者订阅数据流并通过 onNext 方法接收元素,订阅者通过 request 方法请求处理一定数量的元素,同时可以通过 cancel 方法取消订阅。Processor 则可以用于在订阅者和发布者之间进行数据转换和处理。在 Reactive Streams 的实现中,这些接口的方法调用是异步进行的,以支持非阻塞的数据流处理。

3.自定义实现Reactive Streams规范

自己实现了一个,参考了SubmissionPublisher

  • 同步实现的
  • 功能不完善
  • 有bug
class MyPublisher implements Flow.Publisher<String>{MySubscription<String> subscription;public int request ;public void publish(String item){subscription.items.add(item);while (true) {if (request > 0) {for (int i = 0; i < request; i++) {if (!subscription.items.isEmpty()) {try {Object o = subscription.items.get(subscription.items.size() - 1);subscription.subscriber.onNext(o.toString());subscription.items.remove(o);}catch (Exception e){subscription.subscriber.onError(e);return;}}}}if (subscription.items.isEmpty()) {break;}}}@Overridepublic void subscribe(Flow.Subscriber<? super String> subscriber) {System.out.println("第一步:绑定订阅者" );MySubscription<String> subscription = new MySubscription<>(subscriber,this);this.subscription = subscription;subscriber.onSubscribe(subscription);}}class MySubscriber implements Flow.Subscriber<String>{private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {System.out.println("第二步:接收Subscription" );this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("第四步:推送数据" );System.out.println("MySubscriber 消费了item = " + item);subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("出异常了 = " + throwable);}@Overridepublic void onComplete() {}}class MySubscription<T> implements Flow.Subscription{final Flow.Subscriber<? super T> subscriber;final MyPublisher publisher;List items = new ArrayList();public MySubscription(Flow.Subscriber<? super T> subscriber, MyPublisher publisher) {this.subscriber = subscriber;this.publisher = publisher;}@Overridepublic void request(long n) {this.publisher.request++;System.out.println("第三步:拉取请求" );}@Overridepublic void cancel() {}
}
public class FlowDemo {public static void main(String[] args) {MyPublisher myPublisher = new MyPublisher();MySubscriber mySubscriber = new MySubscriber();myPublisher.subscribe(mySubscriber);myPublisher.publish("111");myPublisher.publish("222");myPublisher.publish(null);}
}

4.Jdk实现Reactive Streams使用示例

class SimplePublisher implements Flow.Publisher<Integer> {private final SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>();public void publishItems() {for (int i = 1; i <= 5; i++) {publisher.submit(i);}// 发布者完成发布publisher.close();}@Overridepublic void subscribe(Flow.Subscriber<? super Integer> subscriber) {publisher.subscribe(subscriber);}
}class SimpleSubscriber implements Flow.Subscriber<Integer> {private Flow.Subscription subscription;@Overridepublic void onSubscribe(Flow.Subscription subscription) {this.subscription = subscription;// 请求订阅者处理的元素数量subscription.request(1);}@Overridepublic void onNext(Integer item) {System.out.println("Received item: " + item);// 处理完一个元素后请求下一个subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.err.println("Error occurred: " + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("Processing completed.");}
}public class ReactiveStreamsExample {public static void main(String[] args) throws InterruptedException {// 创建发布者和订阅者SimplePublisher simplePublisher = new SimplePublisher();SimpleSubscriber simpleSubscriber = new SimpleSubscriber();// 订阅者订阅发布者simplePublisher.subscribe(simpleSubscriber);// 发布者发布数据simplePublisher.publishItems();// 睡一觉,确保数据处理完成Thread.sleep(3000);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/334496.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue3 组合式 API 在 onMounted 中调用 dom 报错 Initialize failed: invalid dom.

问题 在开发的过程中&#xff0c;项目中需要用到 echarts&#xff0c;引入后在渲染的过程中报错了&#xff1a;Initialize failed: invalid dom. 这个报错表示元素在未渲染完成的情况下就被调用了&#xff0c;作者在以前也遇到过这种情况&#xff0c;在 vue2 中正常来说将 ech…

Parallel patterns: convolution —— An introduction to stencil computation

在接下来的几章中&#xff0c;我们将讨论一组重要的并行计算模式。这些模式是许多并行应用中出现的广泛并行算法的基础。我们将从卷积开始&#xff0c;这是一种流行的阵列操作&#xff0c;以各种形式用于信号处理、数字记录、图像处理、视频处理和计算机视觉。在这些应用领域&a…

C/C++学习笔记 vcpkg使用备忘及简要说明

一、简述 vcpkg 是一个免费的 C/C 包管理器&#xff0c;用于获取和管理库。从 1500 多个开源库中进行选择&#xff0c;一步下载并构建&#xff0c;或者添加您自己的私有库以简化构建过程。由 Microsoft C 团队和开源贡献者维护。 官方教程 vcpkg 文档 | Microsoft Learnvcpkg …

玩转硬件之玩改朗逸中控设备

这是一个有关一件被拆卸的朗逸中控设备的故事。这个设备已经闲置多年&#xff0c;但是它的命运发生了转变。它被改装成了一台收音机和MP3播放器。 这个设备曾经是一辆朗逸的中控屏幕&#xff0c;就是因为它没有倒车影像&#xff0c;它就被拆了下来&#xff0c;被扔在了一个角落…

Realm Management Extension领域管理扩展之安全状态

RME基于Arm TrustZone技术。TrustZone技术在Armv6中引入,提供以下两个安全状态: 安全状态(Secure state)非安全状态(Non-secure state)以下图表显示了在AArch64中的这两个安全状态以及通常在每个安全状态中找到的软件组件: 该架构将在安全状态运行的软件与在非安全状态运…

03.SpringCloud服务间远程调用

一、Feign远程调用 feign是基于nacos&#xff0c;所以需要先引入对应的依赖。 先来看我们以前利用RestTemplate发起远程调用的代码&#xff1a; 存在下面的问题&#xff1a; 代码可读性差&#xff0c;编程体验不统一 参数复杂URL难以维护 Feign是一个声明式的http客户端…

探索Java中的Map:领略键值对的无限魅力

目录 1、前言 2、介绍Map 2.1 什么是Map 2.2 Map的特点 3、常用的Map实现类 3.1 HashMap 3.2 TreeMap 3.3 LinkedHashMap 3.4 Hashtable 3.5 ConcurrentHashMap 4、操作Map的常用方法 5、Map的应用场景 5.1 缓存 5.2 数据存储 5.3 计数器 6、常见问题解答 6.1…

【漏洞复现】锐捷EG易网关cli.php后台命令执行漏洞

Nx01 产品简介 锐捷EG易网关是一款综合网关&#xff0c;由锐捷网络完全自主研发。它集成了先进的软硬件体系架构&#xff0c;配备了DPI深入分析引擎、行为分析/管理引擎&#xff0c;可以在保证网络出口高效转发的条件下&#xff0c;提供专业的流控功能、出色的URL过滤以及本地化…

极海APM32F003通过IEC 60730/60335功能安全认证,为产品安全保驾护航

近日&#xff0c;极海APM32F003系列工业级超值型MCU&#xff0c;已顺利通过IEC 60730/60335功能安全认证&#xff0c;并可提供符合CLASS B标准的功能安全设计套件&#xff0c;有助于客户减少认证时间与成本&#xff0c;快速推出稳定可靠的终端产品。 *VDE是德国的一个权威性电气…

selenium处理iframe网页办法

学习selenium中&#xff0c;发现有一些网页是嵌套的&#xff0c;即一个html嵌套另一个html&#xff0c;这被称为iframe,selenium只能查找主网页的元素&#xff0c;无法查看嵌套的&#xff0c;进入嵌套的一行代码,首先定位到iframe&#xff0c;然后browser.switch_to.frame(ifra…

如何在企业中实施自适应人工智能?

人工智能不再是企业的选择。很快&#xff0c;它也将不再是一个区分因素。商业中的适应性人工智能正在改变格局。根据最近的统计数据&#xff0c;95%的企业以上都在追求人工智能。 因此&#xff0c;为了确保你拥有竞争优势&#xff0c;你必须期待先进的人工智能选项。适应性就是…

开发分销商城小程序app,轻松助你业绩倍增

开发分销商城小程序app&#xff0c;轻松助你业绩倍增&#xff01; 1. 一键分享&#xff0c;业务拓展&#xff1a;分销商城小程序可生成独特的分销链接与二维码&#xff0c;让你的分销员分享给亲朋好友、社交媒体粉丝。迅速扩大销售网络&#xff0c;提升产品知名度。 2. 佣金管…