响应式编程之Project Reactor

news/2025/3/27 0:45:02/文章来源:https://www.cnblogs.com/wkynf/p/18789638

Project Reactor作为响应式编程范式的核心实现框架,严格遵循Reactive Streams规范体系,其架构设计完整包含了规范定义的四个核心组件:Publisher(数据源)、Subscriber(订阅者)、Subscription(订阅关系)和Processor(处理节点)。在该框架中,FluxMono不仅实现了Publisher接口的标准语义,更构建了完整的响应式数据流处理范式:通过订阅关系建立生产-消费通道,基于事件驱动机制实现非阻塞式数据推送,同时通过背压(backpressure)协议保障系统的弹性通信。

基本流程

从整体上理解 Project Reactor 的工作原理,能够帮助我们更清晰地掌握其中的各种概念和操作,避免迷失方向。实际上,从大局来看,整个 Reactor 就是基于订阅-发布模式的。Flux 和 Mono 作为系统中默认的 Publisher,简化了我们自定义 Publisher 的工作。Flux 和 Mono 集成了大量的操作符,这些操作符的存在减少了我们自定义 Subscriber 和 Processor 的需求。通过这些操作符的组合,我们可以直接对数据源和元素进行操作,而无需自己编写额外的 Processor 和 Subscriber。除非在特殊情况下,否则不建议主动去自定义 Subscriber 和 Processor。

创建数据源(Flux,Mono)->转换和处理数据(map,filter...)->subscribe订阅数据源

一、响应式数据源:

1.1 Flux与Mono

作为Project Reactor的核心发布者,Flux和Mono的主要区别如下:

  • Flux代表0-N个元素的异步序列

  • Mono表示0-1个结果的异步操作

// 创建Flux
Flux.just("1", "2", "3").subscribe(System.out::println);// 创建Mono
Mono.just("a").subscribe(System.out::println);

1.2 数据源类型

了解了Flux和Mono之后,我们知道了如何简单的创建数据源,其中Flux和Mono也给我们提供了非常多的创建数据源的方式,大概分为以下几类。

  1. ‌空数据源: 用于表示无数据的完成信号(如删除操作的结果)。
  2. ‌动态生成:Mono.createFlux.generate/Flux.create 允许手动控制元素发射(同步或异步)。
  3. ‌异步数据源: 从 FutureCallableSupplier 中获取数据,支持非阻塞操作。
  4. ‌时间驱动: Mono.delay 延迟发射,Flux.interval 周期性发射递增数值。
  5. ‌合并/组合: zip 严格对齐元素,merge 无序合并,concat 顺序连接。
  6. ‌背压适配: 通过 FluxSinkMonoSink 手动控制背压和元素发射。

Mono 和 Flux 数据源创建方式分类总结

类别 描述 Mono 方法示例 Flux 方法示例
空数据源 创建不发射任何元素的数据流。 Mono.empty() Flux.empty()
单个元素 发射单个静态值或对象。 Mono.just(T) Flux.just(T...)
多个元素 发射多个静态值或对象(仅 Flux 支持)。 N/A Flux.just(T1, T2...)
集合/数组 从集合或数组生成元素。 N/A Flux.fromIterable(List<T>) Flux.fromArray(T[])
流(Stream) 从 Java Stream 生成元素。 N/A Flux.fromStream(Stream<T>)
动态生成 通过生成器函数动态生成元素。 Mono.create(sink -> {...}) Flux.generate(sink -> {...}) Flux.create(sink -> {...})
异步数据源 从异步操作(如 FutureCallable)获取数据。 Mono.fromFuture(Future) Mono.fromCallable(Callable) Flux.from(Publisher) Flux.fromStream(Supplier<Stream>)
错误信号 直接发射错误信号。 Mono.error(Throwable) Flux.error(Throwable)
延迟初始化 惰性生成数据(订阅时才执行逻辑)。 Mono.defer(() -> ...) Mono.fromSupplier(Supplier) Flux.defer(() -> ...) Flux.fromStream(Supplier<Stream>)
时间驱动 基于时间生成数据(如定时、延迟)。 Mono.delay(Duration) Flux.interval(Duration)
合并/组合 合并多个数据源。 Mono.zip(Mono1, Mono2...) Flux.merge(Flux1, Flux2...) Flux.concat(Flux1, Flux2...) Flux.zip(Flux1, Flux2...)
背压适配 适配外部背压机制(如 Sink 手动控制)。 Mono.create(MonoSink) Flux.create(FluxSink)
条件触发 根据条件生成数据(如 firsttakeUntil)。 Mono.firstWithValue(Mono1, Mono2) Flux.firstWithValue(Publisher...) Flux.takeUntil(Predicate)

1.3 数据源发布模型

Project Reactor 的发布模型是其响应式编程的核心机制,主要分为 ‌冷发布者(Cold Publisher)‌ 和 ‌热发布者(Hot Publisher)‌。它们的区别在于数据流的生成、共享方式以及订阅者的消费行为。以下是详细解释:

1.3.1、冷发布者(Cold Publisher)

定义‌:冷发布者为每个订阅者生成‌独立的数据流‌。每个订阅者都会触发数据源的完整生成过程,即使其他订阅者已订阅过。

特点‌:

  1. 数据流独立‌:每个订阅者从头开始消费数据。
  2. 延迟生成‌:数据在订阅时才开始生成(惰性计算)。
  3. 资源隔离‌:不同订阅者的数据生成逻辑互不影响。

适用场景‌:

  • 需要每个订阅者获取完整数据(如 HTTP 请求、数据库查询)。

  • 数据源的生成成本较高,但需确保订阅者的独立性。

    代码示例

    // 创建冷发布者
    Flux<Integer> coldFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("冷发布者发出: " + i));
    // 第一个订阅者
    coldFlux.subscribe(i -> System.out.println("订阅者1: " + i));
    // 第二个订阅者
    coldFlux.subscribe(i -> System.out.println("订阅者2: " + i));
    

    输出

    冷发布者发出: 1
    订阅者1: 1
    冷发布者发出: 2
    订阅者1: 2
    冷发布者发出: 1
    订阅者2: 1
    冷发布者发出: 2
    订阅者2: 2
    

1.3.2、热发布者(Hot Publisher)

  • 定义‌:热发布者共享一个‌统一的数据流‌,所有订阅者消费同一份数据。数据源的生成与订阅者的订阅时间无关,后订阅的订阅者可能错过早期数据。

    特点‌:

    1. 数据流共享‌:所有订阅者接收同一数据源。
    2. 实时性‌:数据源的生成独立于订阅行为。
    3. 资源复用‌:多个订阅者共享同一数据生成逻辑。

    适用场景‌:

    • 实时事件推送(如传感器数据、股票报价)。
    • 需要广播数据,避免重复生成高成本操作(如 WebSocket 消息)。

热发布者的实现方式有如下几种:

  1. ConnectableFlux(手动控制)

通过 publish() 方法将 Flux 转换为 ConnectableFlux,需手动调用 connect() 启动数据流。

代码示例

        // 创建 ConnectableFlux 并转换为热发布者ConnectableFlux<Integer> hotFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("热发布者发出: " + i)).publish(); // 转换为 ConnectableFlux// 订阅者AhotFlux.subscribe(i -> System.out.println("订阅者A: " + i));// 订阅者BhotFlux.subscribe(i -> System.out.println("订阅者B: " + i));// 手动触发数据流开始hotFlux.connect();

输出

热发布者发出: 1
订阅者A: 1
订阅者B: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
  1. autoConnect()‌(自动连接)

当达到指定订阅者数量时,自动启动数据流。

        Flux<Integer> autoFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("热发布者发出: " + i)).publish().autoConnect(2);// 当有 2 个订阅者时自动启动autoFlux.subscribe(i -> System.out.println("订阅者A: " + i));autoFlux.subscribe(i -> System.out.println("订阅者B: " + i));

输出

热发布者发出: 1
订阅者A: 1
订阅者B: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
  1. share()‌(简化热发布者)

等价于 publish().refCount(1):当第一个订阅者到来时启动,最后一个取消订阅时终止。

        Flux<Long> sharedFlux = Flux.interval(Duration.ofSeconds(1)).doOnNext(i -> System.out.println("热发布者发出: " + i)).take(5).share();sharedFlux.subscribe(i -> System.out.println("订阅者A: " + i));Thread.sleep(2500);sharedFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B错过前2个数据

输出

热发布者发出: 0
订阅者A: 0
热发布者发出: 1
订阅者A: 1
热发布者发出: 2
订阅者A: 2
订阅者B: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 3
热发布者发出: 4
订阅者A: 4
订阅者B: 4

  1. replay()‌(历史数据缓存)

允许新订阅者消费订阅前的历史数据(缓存策略可配置)。

        ConnectableFlux<Integer> replayFlux = Flux.range(1, 3).doOnNext(i -> System.out.println("热发布者发出: " + i)).replay(2);// 缓存最近2个数据replayFlux.subscribe(i -> System.out.println("订阅者A: " + i));replayFlux.connect();Thread.sleep(1000);replayFlux.subscribe(i -> System.out.println("订阅者B: " + i)); // 订阅者B收到最后2个数据

输出‌:

热发布者发出: 1
订阅者A: 1
热发布者发出: 2
订阅者A: 2
热发布者发出: 3
订阅者A: 3
订阅者B: 2
订阅者B: 3

冷发布者和热发布者对比表格

特性 冷发布者 热发布者
数据生成时机 订阅时生成 提前生成(或由 connect() 触发)
订阅者独立性 每个订阅者独立消费完整数据 共享同一数据流
资源消耗 高(每个订阅者独立生成) 低(共享生成逻辑)
典型场景 数据库查询、静态数据 实时事件、广播

二、强大的操作符生态系统

2.1 核心操作符分类

类别 操作符示例 功能描述
转换操作符 buffer, map, flatMap, window 修改流中元素结构或内容(如分组、映射、扁平化)
过滤操作符 filter, take, skip 按条件筛选元素(如保留满足条件的元素、跳过前N项)
组合操作符 merge, concat, zip 合并多个流(如按顺序连接、并行合并、元素一一配对)
条件操作符 any, all, hasElement 判断流中元素是否满足条件(如是否存在满足条件的元素)
数学操作符 count, sum, reduce 对元素进行聚合计算(如统计总数、求和、累加)
错误处理操作符 onErrorReturn, onErrorResume 异常时提供备选值或切换至备用流(如返回静态值、动态恢复逻辑)
工具操作符 delay, timeout, log, subscribe 控制流生命周期(如延迟发送、超时中断、记录日志、触发订阅)
整个数据源操作 doOnNext,,,doOnRequest,doOnSubscribe,doOnComplete 其中以doOn开头的可以对整个数据链的不同状态进行操作

2.2 常见操作(类似Java的Stream)

//转换操作符、过滤操、条件及数学操作符类似Java的Stream这里不做过多赘述
//map
Flux.just(1, 2, 3).map(i -> i + 1).subscribe(System.out::println);
//filter
Flux.just("a", "b", "c").filter(s -> s.equals("a")).subscribe(System.out::println);
//flatMap
Flux.just("a", "b", "c").flatMap(s -> Flux.just(s.toUpperCase())).subscribe(System.out::println);
//reduce
Flux.just(1, 2, 3).reduce(0, (a, b) -> a + b).subscribe(System.out::println);
//window  窗口使用
Flux.just(1, 2, 3, 4, 5, 6).window(3, 1).flatMap(e -> e.reduce(0, Integer::sum)).subscribe(System.out::println);
//buffer  背压或者批处理使用,会缓存数据
Flux.just(1, 2, 3, 4, 5, 6).buffer(3, 1).subscribe(System.out::println);

2.3 组合操作符

  • zip

zip操作符可以将多个(最多8个)流合并成一个流,合并的方式是将两个流中的元素按照顺序一一对应,然后将两个元素组合成一个元素。 如果两个流的长度不一致,那么最终合并成的流的长度就是两个流中长度较短的那个流的长度。

Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<String> flux2 = Flux.just("d", "e", "f");
Flux<String> flux3 = Flux.just("1", "2", "3");
Flux.zip(flux1, flux2, flux3).subscribe(System.out::println);//输出
[a,d,1]
[b,e,2]
[c,f,3]
  • merge

merge 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。

Flux<Integer> flux3 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
Flux<Integer> flux4 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
flux3.mergeWith(flux4).subscribe(System.out::println);//输出  由于是根据时间先后处理,所以结果大概率是这样,也有可能会稍有不同
4
1
5
2
6
3
  • concat

concat 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序放入到合并后的流中。按照顺序分别运行,flux1运行完成以后再运行flux2

Flux<Integer> flux1 = Flux.just(1, 2, 3).delayElements(Duration.ofMillis(80));
Flux<Integer> flux2 = Flux.just(4, 5, 6).delayElements(Duration.ofMillis(50));
flux1.concatWith(flux2).subscribe(System.out::println);
//输出
1
2
3
4
5
6

2.4 整个数据源操作

Project Reactor 提供了大量的以doOn开头的方法,这些方法用于在数据流的生命周期中插入副作用逻辑(如日志、监控或资源管理),‌不修改数据流本身,仅用于观察或触发行为‌。

每个方法的使用方法大致相同,下面以doOnRequest和doOnNext做一下简单的示例。

Flux.just(1, 2, 3, 4, 5, 6).doOnNext(s -> System.out.println("doOnNext: " + s)).subscribe();
System.out.println("----------------");
Flux.just(1, 2, 3).doOnRequest(s -> System.out.println("doOnRequest: " + s)).subscribe(System.out::println);
//输出
doOnNext: 1
doOnNext: 2
doOnNext: 3
doOnNext: 4
doOnNext: 5
doOnNext: 6
----------------
doOnRequest: 9223372036854775807
1
2
3

下面是每个方法的使用场景和触发时机。

方法 触发时机 参数类型 典型场景
doOnSubscribe 订阅时 Consumer<Subscription> 资源初始化
doOnNext 元素推送时 Consumer<T> 日志记录、状态更新
doOnError 发生错误时 Consumer<Throwable> 错误监控、报警
doOnComplete 流正常结束时 Runnable 完成通知
doOnRequest 下游请求数据时 Consumer<Long> 背压调试、请求量监控
doOnCancel 取消订阅时 Runnable 资源释放
doOnEach 所有事件发生时 Consumer<Signal<T>> 统一事件处理
doOnTerminate 流终止前(完成/错误前) Runnable 终止前清理逻辑
doAfterTerminate 流终止后(完成/错误后) Runnable 终止后统计
doOnDiscard 元素被丢弃时 Consumer<T> 资源回收、数据一致性检查

三、执行控制:订阅与调度

3.1 订阅机制

subscribe 操作符用来订阅流中的元素。 当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。 通过上面内容的阅读,相信你已经对Project Reactor的发布订阅模型已经了解了个大概,上面的订阅的例子也有很多,这里不做过多的赘述。

3.2 调度器策略

Schedulers‌ 是管理线程和并发任务的核心工具,用于控制响应式流的执行上下文。通过合理选择调度器,可以优化资源利用、避免阻塞,并提升应用性能

调度器 线程模型 适用场景 注意事项
immediate 当前线程 轻量级同步操作 避免阻塞
single 单线程 严格顺序执行 避免长时间阻塞
boundedElastic 动态线程池 阻塞 I/O 操作 控制最大线程数和队列容量
parallel 固定大小线程池 计算密集型并行任务 线程数默认等于 CPU 核心数
fromExecutorService 自定义线程池 集成现有线程池 需自行管理生命周期

3.3 默认调度器

在 Project Reactor 中,可以很方便的通过publishOn和subscribeOn来切换使用的线程调度器。

Flux.range(1, 10).publishOn(Schedulers.boundedElastic()) //切换调度器.log("publish thread:").flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel()))  //切换调度器.log("subscribe thread:").subscribe();

3.4 自定义虚拟线程调度器

当然在JDK17及更改的版本中也可以结合虚拟线程进一步提高并发量。

Scheduler customSchedule = Schedulers.fromExecutor(Executors.newVirtualThreadPerTaskExecutor());
Flux.range(1, 10).publishOn(customSchedule).log("publish thread:").flatMap(n -> Mono.fromCallable(() -> n).subscribeOn(Schedulers.parallel())).log("subscribe thread:").subscribe();

四、高级控制组件

4.1 Processor与Sink的关系

在 Project Reactor 中,‌Processor‌ 曾是一个关键组件,但随着 Reactor 3.4+ 版本的演进,官方逐渐将其标记为‌弃用(Deprecated)‌,并推荐使用更现代的 ‌Sink API‌ 替代。以下是弃用原因、两者核心区别。

1. ‌线程安全

  • processor:大多数 Processor 实现(如 DirectProcessorUnicastProcessor)‌非线程安全‌,直接调用 onNextonComplete 等方法需手动同步。
  • Sink: 原子性操作‌:Sink 提供 tryEmitNexttryEmitError 等方法,确保多线程推送数据时的安全性。

2. ‌角色定位

  • Processor: 同时作为 PublisherSubscriber,这种设计虽然灵活,但导致职责不清晰,容易误用。
  • Sink:仅作为纯生产者(仅生成数据流)

3. ‌背压处理

  • processor

    对背压的支持差异大:

    • DirectProcessor 完全忽略背压(无界队列)。
    • UnicastProcessor 支持单订阅者的背压,但需手动配置缓冲区。
  • Sink‌:内置配置,通过 onBackpressureBufferonBackpressureError 等链式方法直接定义背压行为。

4. ‌生命周期管理复杂

  • processor‌:需显式调用 onCompleteonError 结束流,若遗漏可能导致资源泄漏或订阅者挂起。
  • Sink:通过 tryEmitCompletetryEmitError 明确结束流,避免资源泄漏。

5. ‌API 设计

  • processor‌:Processor 的 API 未针对现代响应式编程模式优化(如缺少对重试、重播的内置支持)。
  • Sink:灵活简单,通过 Sinks.Manymulticast()unicast()replay() 快速配置多订阅者行为。

4.2 API使用示例

​ 由于processor已经被弃用,不推荐使用,这里不做过多介绍。

  • 1:发送单个数据

    Sinks.One<String> sink = Sinks.one();
    Mono<String> mono = sink.asMono();
    mono.subscribe(value -> System.out.println("Received: " + value),error -> System.err.println("Error: " + error),() -> System.out.println("Completed")
    );
    sink.tryEmitValue("Hello");  // 等效于 tryEmitNext + tryEmitComplete
    
  • 2:发送多个数据

    // 创建多播 Sink, 并设计缓冲被压策略
    Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
    //转换为flux
    Flux<String> hotFlux = sink.asFlux().map(String::toUpperCase);// 订阅者A
    hotFlux.subscribe(i -> System.out.println("订阅者A: " + i));// 订阅者B
    hotFlux.subscribe(i -> System.out.println("订阅者B: " + i));// 发送数据
    sink.tryEmitNext("hello");
    sink.tryEmitNext("world");
    sink.tryEmitComplete();
    
  • 3:支持历史数据

// 创建重播 Sink,保留最近 2 个元素
Sinks.Many<String> sink = Sinks.many().replay().limit(2);sink.tryEmitNext("A");
sink.tryEmitNext("B");// 订阅者1 (接收历史数据 A, B)
sink.asFlux().subscribe(s -> System.out.println("Sub1: " + s));// 推送新数据
sink.tryEmitNext("C");// 订阅者2(接收历史数据 B, C)
sink.asFlux().subscribe(s -> System.out.println("Sub2: " + s));//输出
Sub1: A
Sub1: B
Sub1: C
Sub2: B
Sub2: C

4.2 背压

4.2.1 背压策略

1.onBackpressureBuffer(缓冲策略)

  • 行为‌:将未消费的数据存储在缓冲区中,等待下游请求时发送。
  • 配置选项
    • 缓冲区大小‌:可指定有界或无界(默认无界,需谨慎使用)。
    • 溢出策略
      • ERROR:缓冲区满时抛出 IllegalStateException
      • DROP_LATEST:丢弃新数据,保留旧数据。
      • DROP_OLDEST:丢弃最旧数据,保留新数据。
  • 适用场景‌:允许短暂的速度不匹配,但需控制内存占用。

2. onBackpressureError(错误策略)

  • 行为‌:当缓冲区满或下游未请求时,‌立即抛出错误‌(IllegalStateException)。
  • 适用场景‌:严格要求实时性,容忍数据丢失但需快速失败。

3. directBestEffort(尽力而为策略)

  • 行为‌:无缓冲区,直接推送数据到下游。如果下游未请求,‌静默丢弃新数据‌。
  • 特点‌:避免内存占用,但可能导致数据丢失。
  • 适用场景‌:实时事件处理(如日志、指标采集),允许偶尔丢失。

4. replay(重播策略)

  • 行为

    新订阅者重播历史数据

    • 同时支持实时数据推送。

    • 可配置重播的缓冲区大小(如保留最近的 N 个元素)。

  • 背压处理

    • 对新订阅者:重播历史数据时遵循背压请求。
    • 对实时数据:使用 onBackpressureBufferdirectBestEffort 策略。
  • 适用场景‌:需要新订阅者获取历史数据的场景。

4.2.2. 默认策略

  • multicast()‌:默认使用 directBestEffort(无缓冲区)。
  • unicast()‌:默认使用 onBackpressureBuffer(无界缓冲区)。
  • replay()‌:默认保留所有历史数据(无界缓冲区)。

五、 Hooks与Context

5.1 Hooks

在 Project Reactor 中,‌Hooks‌ 是一组全局回调机制,允许对 Reactor 库的默认行为进行‌定制化扩展‌,用于调试、监控或修改响应式流的执行逻辑。

1、Hooks 的核心用途

  1. 全局错误处理‌:捕获未被下游处理的异常。
  2. 操作符生命周期监控‌:在操作符执行前后插入自定义逻辑。
  3. 调试与追踪‌:增强堆栈跟踪信息,定位异步流问题。
  4. 行为修改‌:动态替换或包装操作符的实现。

2、常用 Hooks 及功能

1. onOperatorError

  • 作用‌:捕获操作符执行过程中抛出的‌未处理异常‌。

  • 典型场景‌:统一日志记录、转换错误类型。

    Hooks.onOperatorError((error, context) -> {System.err.println("全局捕获异常: " + error);return error; 
    });
    

2. onNextDropped

  • 作用‌:处理因下游取消订阅、背压溢出等原因被‌丢弃的 onNext 元素‌。

  • 典型场景‌:记录丢失的数据,用于审计或补偿。

    Hooks.onNextDropped(item -> System.out.println("元素被丢弃: " + item)
    );
    

3. onErrorDropped

  • 作用‌:处理因下游已终止(如已调用 onComplete)而被‌丢弃的 onError 信号‌。

  • 典型场景‌:避免静默忽略错误。

    Hooks.onErrorDropped(error -> System.err.println("错误被丢弃: " + error)
    );
    

4. onOperatorDebug

  • 作用‌:启用‌调试模式‌,为异步操作符生成增强的堆栈跟踪信息(含订阅点位置)。

  • 代价‌:增加性能开销,‌仅限开发环境使用‌。

    Hooks.onOperatorDebug(); // 启用调试模式
    

5. onEachOperator / onLastOperator

  • 作用‌:在‌每个操作符执行前后‌插入自定义逻辑(如日志、指标采集)。

  • 典型场景‌:性能监控、动态修改数据流。

    Hooks.onEachOperator(operator -> {long start = System.currentTimeMillis();return original -> original.doFinally(signal -> System.out.println("操作符耗时: " + (System.currentTimeMillis() - start) + "ms"));
    });
    

6. 重置 Hooks

  • 恢复默认行为

    javaCopy CodeHooks.resetOnOperatorError();
    Hooks.resetOnNextDropped();
    Hooks.resetOnOperatorDebug();
    

4.4 Context

在 Project Reactor 中,‌Context‌ 是用于在响应式流的各个阶段之间传递‌上下文数据‌的核心机制。它解决了传统 ThreadLocal 在异步、多线程环境中的局限性,允许数据在操作符链中安全传递。以下是 Context 的详细解析,涵盖其设计思想、API 使用及典型场景。

1. 为什么需要 Context?
  • 问题‌:在异步响应式流中,数据可能由不同线程处理,ThreadLocal 无法跨线程传递。
  • 解决方案‌:Context 提供一种与订阅链绑定的、不可变的键值存储,确保上下文数据在流的生命周期内可被安全访问。
2. Context 的特点
  • 不可变性‌:每次修改会生成新实例,确保线程安全。
  • 订阅链绑定‌:数据跟随订阅链传递,而非依赖线程(需要注意的是Context的传递是从底部往上传递的)。
  • 键值存储‌:类似 Map 结构,支持类型安全的键(ContextKey)。
  • 自底向上(Downstream → Upstream)
    • 写入顺序‌:后调用的 contextWrite 会覆盖先调用的。
    • 读取顺序‌:下游(靠近订阅点)的 Context 优先被访问。

通过 contextWrite 操作符将 Context 写入响应式流,通过deferContextual在流中读取 Context

//注意由于ontext的传递是从底部往上传递的,所以必须在下面(A点)先写入才能在(B点读取到)
Flux.just("A", "B", "C", "D")//记为B点  拼接 Context 中的值.flatMap(s -> {System.out.println("ssss:" + s);return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));})//记为A点  写入 Context(关键:必须在读取操作之前调用).contextWrite(Context.of("suffix", "-ctx"))// 订阅输出结果.subscribe(System.out::println);

Context自底向上(Downstream → Upstream)传播示例

由于Context自底向上的传播特性,所以Context中B点的值会覆盖A点的值

Flux.just("A", "B", "C", "D")// 拼接 Context 中的值.flatMap(s -> {//由于ctx222会覆盖ctx111,所以此处拼接的是ctx222System.out.println("ssss:" + s);return Mono.deferContextual(ctx -> Mono.just(s + ctx.get("suffix")));})//记为B点,    写入 Context ctx222会覆盖ctx111.contextWrite(Context.of("suffix", "-ctx222"))//记为A点,    写入 Context.contextWrite(Context.of("suffix", "-ctx111"))// 订阅输出结果.subscribe(System.out::println);//输出
ssss:A
A-ctx222
ssss:B
B-ctx222
ssss:C
C-ctx222
ssss:D
D-ctx222

结语

通过深入理解Project Reactor这些核心概念,可以更好地驾驭响应式编程范式,构建出更高效、更弹性的分布式系统。与现代虚拟线程的结合,为构建新一代高并发应用提供了更优解。通过合理选择调度策略、优化线程模型,可以在保持代码简洁的同时,充分发挥硬件性能。

路漫漫其修远兮,吾将上下而求索


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/904842.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

centOS 上部署hadoop+mysql+hive 服务之mysql 和hive安装

hive默认存放元数据的数据库是Derby数据库,Derby数据库是嵌入式数据库,它只能单用户访问,也就是只能有一个会话连接到元数据存储,不适合多用户连接操作需求。比如,多用户同时进行查询或并发操作时,Derby无法处理,这会导致性能瓶颈或直接报错。因此,建议替换为用mysql在…

Pydantic根校验器:构建跨字段验证系统

title: Pydantic根校验器:构建跨字段验证系统 date: 2025/3/24 updated: 2025/3/24 author: cmdragon excerpt: Pydantic根校验器支持预处理(pre)与后处理(post)模式,可访问全量字段数据并修改值字典。多字段关联验证实现业务规则检查,如航班时间顺序与保险策略联动。分阶…

机器人焊接节气设备

WGFACS(Welding Gas Flow Automatic Control System)机器人焊接气体流量自适应控制系统。机器人焊接节气设备包含高速控制和采集系统与气体流量自适应装置。它专为在保证焊接质量的前提下,实现节能减排、降低用气成本而设计,广泛应用于汽车制造、航空航天、机械加工等多个行业…

做销售,会讲故事,能说服90%的客户!

在开始之前,先抛出一个问题:说服他人最有效的方式是什么? 或许很多人的第一反应是:事实和数据。毕竟,数据被认为是不会说谎的(当然,也存在数据造假的情况),人们似乎也更倾向于相信数据。正因如此,许多销售话术总会包含大量数据,诸如升学率达 xx%、录取率为 xx%、满意…

CSP安全策略与XSS漏洞的关系

CSP与XSS漏洞的关系1、CSP不依赖漏洞的存在: 2、CSP是独立的安全机制,其有效性取决于策略本身的严格性,而非漏洞是否存在。 例如: 严格CSP(基于nonce):即使站点存在XSS漏洞,攻击者也无法伪造合法的 nonce 值来执行脚本。 宽松CSP(允许unsafe-inline):若漏洞存在,攻…

vue项目启动报错---FATAL ERROR js堆内存不足

vue项目启动报错---FATAL ERROR js堆内存不足 估计项目太大 电脑性能扛不住 解决方法如下: 1.启动项目报错 估计是项目太大了 电脑性能不太行FATAL ERROR: Ineffective mark-compacts near heap limit Allocation failed - JavaScript heap out of memory 2.白话文---致命…

spring boot项目打包例子

1.进入到spring项目执行 mvn clean package2.这个时候会在target目录下生成如下包 3.运行 java -jar demo-0.0.1-SNAPSHOT.war 4.调用 5.项目目录结构

攻击

sql注入 SQL注入是一种常见的Web应用程序漏洞,攻击者可以利用它来执行恶意的SQL查询,从而绕过身份验证、访问敏感数据或对数据库进行破坏。以下是一些常见的SQL注入攻击语句示例:基本的SQL注入攻击:SELECT * FROM users WHERE username = admin AND password = OR 1=1; Co…

CTF

Web前置技能 HTTP请求 请求方式读题get传参类型 UseCTF**B那就传参CTFHUB得到flagBP抓包改包得到flag302跳转点击这个会进行http重定向重新返回这个页面 我们就可以用BP截取截取第一次请求得到flag cookie看提示唯一admin可以得到flag基本认证响应包源代码 信息泄露 目标遍历ph…

PPT 转高精度图片 API 接口

PPT 转高精度图片 API 接口 文件处理 / 图片处理,将 PPT 文件转换为图片序列。1. 产品功能支持将 PPT 文件转换为高质量图片序列; 支持 .ppt 和 .pptx 格式; 保持原始 PPT 的布局和样式; 转换后的图片支持永久访问; 全接口支持 HTTPS(TLS v1.0 / v1.1 / v1.2 / v1.3); …

20242931 2024-2025-2 《网络攻防实践》第四周作业

20242931 2024-2025-2 《网络攻防实践》第四周作业 1. 实验内容 1.1 实验要求 在网络攻防实验环境中完成TCP/IP协议栈重点协议的攻击实验,包括ARP缓存欺骗攻击、ICMP重定向攻击、SYN Flood攻击、TCP RST攻击、TCP会话劫持攻击。 1.2 知识点梳理与总结攻击类型 攻击原理 攻击目…

面试官:工作中优化MySQL的手段有哪些?

MySQL 是面试中必问的模块,而 MySQL 中的优化内容又是常见的面试题,所以本文来看“工作中优化MySQL的手段有哪些?”。工作中常见的 MySQL 优化手段分为以下五大类:索引优化:确保高频查询字段有合适索引。 SQL优化:减少全表扫描、避免不必要计算。 事务与锁优化:避免长事…