关于响应式流

news/2024/11/15 11:45:13/文章来源:https://www.cnblogs.com/jhfnewstart/p/18547654

响应式流处理引入了类似于观察者模式(Observer Pattern)的异步、非阻塞、事件驱动的编程范式,允许数据作为连续的流进行处理。它可以处理异步数据流,并支持 back-pressure(反压),这意味着消费者可以以它们能够处理的速度来消费数据。

Project Reactor

Project Reactor 是一个实现了响应式流规范的 Java 库,提供了丰富的 API 用于编程处理异步数据流。任何时候数据流都可以被视为一个系列的事件,这在概念上与 Reactor 模式类似。
Project Reactor为 Java 提供了异步和事件驱动的编程模型。这个库主要提供了两种核心的响应式类型:MonoFlux,以及管理异步任务执行的 Schedulers

Mono

Mono 代表一个单一的异步计算值,可以发出0或1个元素。它是处理单一异步操作的理想选择,例如异步的HTTP请求响应、单个数据库查询等。

使用示例

import reactor.core.publisher.Mono;public class MonoExample {public static void main(String[] args) {Mono<String> mono = Mono.just("Hello World");mono.subscribe(System.out::println); // 输出 Hello World}
}

在这个例子中,Mono.just 创建了一个包含单一值的 Mono 实例,并通过 subscribe 方法输出这个值。

优点

  • 简洁的 API,适合处理单一值或单一事件。
  • 支持简单和复杂的异步转换操作。
  • 减少资源消耗(相比返回整个集合的服务)。

缺点

  • 对于初学者而言,响应式编程的学习曲线较陡峭。
  • 错误处理和调试可能比较复杂。

Flux

Flux 代表一个异步序列,可以发出0到N个元素。它适用于处理多值响应,如数据流、事件流或其他类型的元素集合。

使用示例

import reactor.core.publisher.Flux;public class FluxExample {public static void main(String[] args) {Flux<Integer> flux = Flux.just(1, 2, 3, 4);flux.subscribe(System.out::println); // 输出 1, 2, 3, 4}
}

在这里,Flux.just 生成一个包含多个整数的 Flux 实例,并通过 subscribe 输出每个整数。

优点

  • 灵活地处理多个数据。
  • 支持多种反压策略来应对数据流速率的控制。
  • 强大的操作符集合,支持复杂的数据流转换和合并操作。

缺点

  • 资源消耗可以随着流中元素数量的增加而增加。
  • 需要更多的控制和错误处理策略。

Schedulers

Schedulers 是 Reactor 提供的一个组件,用于控制异步任务在不同的工作线程或线程池上执行。它使得开发者可以灵活地控制执行环境,优化性能,尤其适合于处理IO密集或CPU密集的任务。

使用示例

Mono.fromCallable(() -> {return "Some IO operation result";
}).subscribeOn(Schedulers.boundedElastic()).subscribe(System.out::println);

在这个例子中,使用 Schedulers.boundedElastic() 确保在一个弹性的、旨在为阻塞任务优化的线程池中执行 I/O 操作。

优点

  • 灵活的线程管理,能够根据任务特性优化线程的使用。
  • 可以避免阻塞主线程,提高应用的响应性和吞吐量。

缺点

  • 管理不当可能导致资源过度使用,如创建过多的线程。
  • 可能增加应用的复杂性。

每种组件都有其适用场景和限制。选择合适的类型和调度器,可以帮助开发高效、响应快速的应用。

Schedulers.boundedElastic()

在响应式编程和特别是使用诸如 Project Reactor 这样的库时,确保系统的鲁棒性和避免像内存耗尽(Out of Memory, OOM)这种问题非常重要。在响应式流中使用 subscribeOn(Schedulers.boundedElastic()) 联合 onErrorResume 方法可以在一定程度上帮助管理资源并处理错误,从而避免系统因异常情况如OOM而崩溃。让我们逐一看看这些组件怎样工作,并解释其对预防OOM的潜在影响。

Schedulers.boundedElastic()
在 Project Reactor 中,Schedulers.boundedElastic() 创建了一个可以根据需要弹性扩展的调度器,但有一定的限制,用于避免无限制地创建线程。boundedElastic调度器主要用于执行阻塞操作或长时间运行的任务,而不会消耗过多的系统资源(例如线程)。线程数的上限默认为CPU核数的10倍,任务队列的大小限制为10,000。当线程和队列达到上限时,进一步的任务提交会被延迟,直到有空闲资源。

这种调度器的使用尤其适合处理那些不适合在响应式流的主线程中执行的阻塞I/O操作,因为这些操作会阻塞处理流程,减慢系统反应速度。

onErrorResume
onErrorResume 是一个错误处理操作符,它允许你在发生错误(如抛出异常)时提供一个备用的数据流。这意味着,如果原始流中出现了错误,比如因为OutOfMemoryError而失败,你可以决定终止出错的流并继续执行,恢复成一个预设的备用流,或者简单地清理资源并进行适当的日志记录。

避免OOM的潜在影响
使用 subscribeOn(Schedulers.boundedElastic()) 允许长时间运行的或阻塞的任务在单独的线程中执行,这样主流程就不会被阻塞。这种方式可减少主系统线程的负载,从而降低因资源耗尽导致OOM的风险。

通过 onErrorResume 提供的错误处理机制,当流中的某个部分出现问题(例如,因为无法处理的大量数据而导致的OOM),程序可以捕获这个异常并优雅地处理,比如释放资源、调整内存使用、或者切换到更为节省内存的操作,而不是让整个应用崩溃。

综合使用这两个方法不仅增强了程序的健壮性,而且提供了更细粒度的资源控制和错误处理选项,有助于在面对资源限制和不可预测错误时更好地维护应用的稳定运行。然而,它们并不能直接防止OOM异常的发生,正确的数据管理和流控制策略才是根本解决方式。这包括合理的反压策略、数据分批处理、合理的内存和资源分配等措施。

反压

反压(Back-Pressure)的概念

在响应式流处理中,反压(Back-Pressure)是一种重要的机制,用于处理数据生产者(Publisher)和消费者(Subscriber)之间速率不一致的问题。当生产者产生数据的速度超过消费者处理数据的速度时,未处理的数据可能会在内存中积压,导致资源耗尽,甚至应用程序崩溃。反压机制允许消费者根据自己的处理能力,控制生产者的数据生产速度,从而有效避免这一问题。

反压的工作原理

在响应式编程标准(如Reactive Streams)中,当消费者准备好接收新的数据项时,它会向生产者发送一个请求信号,指明可以发送多少数据项。这样,生产者在发送数据前会等待这个请求信号,从而确保不会发送消费者处理不了的数据量。

应用场景

  1. 实时数据流处理:例如,处理来自传感器或用户界面事件的数据流,其中数据产生速度可能非常快。
  2. 资源有限的环境:在内存或处理能力有限的设备上,如移动设备或嵌入式系统,反压机制可以防止系统过载。
  3. 高负载系统:在服务器处理大量并发请求或大数据处理任务时,反压机制帮助系统平衡负载,提高稳定性。

如何使用反压

在Java中,通过发布订阅框架(如Reactor或RxJava)可以很容易地实现反压。

使用 Project Reactor 实现反压:

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;public class BackPressureExample {public static void main(String[] args) {Flux.range(1, 100) // 生产一个1到100的数列.onBackpressureDrop() // 当Backpressure时,丢弃无法处理的数据.publishOn(Schedulers.parallel(), 1) // 指定使用并行调度器,并设置最大请求量为1.subscribe(data -> {System.out.println("Received: " + data);try {Thread.sleep(100); // 模拟慢消费者} catch (InterruptedException e) {Thread.currentThread().interrupt();}},err -> System.err.println("Error: " + err),() -> System.out.println("Completed"));}
}

在这个例子中,.onBackpressureDrop()操作符用于指定如何处理溢出数据。当消费者处理速度跟不上生产者时,超出的数据将被丢弃。

反压的优缺点

优点:

  1. 避免资源耗尽:通过控制数据流的速度,反压机制防止了因数据堆积而导致的内存耗尽。
  2. 提高系统稳定性:系统更加稳定,因为数据流速度被适当控制,避免了过载。
  3. 适应性:反压机制自动适应消费者的处理能力,优化整个数据流的处理过程。

缺点:

  1. 复杂性增加:实现反压增加了系统的设计和实现复杂性。
  2. 性能影响:在某些情况下,为了实现反压控制,可能需要额外的系统资源监控或调整,并可能影响系统性能。
  3. 数据丢失:在使用如dropbuffer策略处理反压时,可能会导致数据丢失。

正确实现和配置反压机制,对于构建高性能且稳定的响应式系统至关重要。开发者需要根据具体应用场景做出适当的设计选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/833945.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MATLAB R2023b for Mac(专业的编程和数学计算软件)v23.2.0.2428915激活版

MATLAB R2023b是MathWorks公司推出的一款专业的数值计算和数据可视化软件,它是MATLAB软件系列的最新版本。该软件在科学、工程和金融等领域的数据分析和模拟方面表现出色,具有强大的功能和工具。MATLAB R2023b引入了更强大的并行计算功能,提高了工作效率,使得用户可以更快地…

好用的视频照片格式转换软件Permute 3你用了吗?

Permute 3是一款功能强大的媒体文件格式转换软件,它以其简洁高效的界面和丰富的功能赢得了用户的青睐。该软件支持视频、音频、图片等多种文件格式的转换,包括但不限于MP4、AVI、MOV、MKV等视频格式,以及MP3、WAV、AAC等音频格式,还有JPEG、PNG等图片格式。用户只需将文件拖…

空壳分身产品之路:直面自身的缺点

这大概是极少见的官方展现自身产品缺陷的文章,旨在帮助用户从多个维度快速判断该产品是否符合个人需求,以避免浪费时间和精力在不必要的下载、安装和使用上。这大概是极少见的官方展现自身产品缺陷的文章,从产品体验,功能设计,技术方案和未来规划的角度,阐述空壳产品的做…

一文解读GaussDB(DWS)监控运维诊断优化能力

帮助开发者分析执行计划,分析SQL语句执行性能消耗,提升租户侧自运维能力。本文分享自华为云社区《GaussDB(DWS) 监控运维诊断优化,历史查询诊断》,作者: yd_219384351。DWS历史查询诊断,基于DWS集群历史topsql,提供异常诊断能力。提供SQL趋势统计分析曲线图,展示SQL历史…

Git进阶实用命令

总结最常用的git命令操作。Mac推荐可视化软件Sourcetree1. 本地仓库git init # 初始化本地git 以下所有操作的前提条件git add -A # 添加当前所有变动文件到本地缓存区 git commit -m <commit-word> # 提交缓存区内容到本地仓库 git commit -am <commit-word> # 上…

go fiber: 抛出自定义异常

一,代码: 1,自定义错误类: package configimport ("fmt" ) //定义错误代码和错误信息 type MyError struct {Code intMsg string } //需要定义通用的Error()方法 func (e MyError) Error() string {return fmt.Sprintf("Code: %d,Msg: %s",e.Code…

移动端按住说话样式

下面是最终效果,手指移出指定区域就改为取消状态,松开手指就取消,手指没有移出指定区域,状态为录音中,松开手指为结束录音状态下面是代码<!DOCTYPE html> <html lang="zh"> <head><meta charset="UTF-8"><meta name="…

联想 进入bios u盘启动

先插入u盘,再选择选项 thinkpad e485 , f12快速选择启动选项 , f1 进入bios其他 联想进入bios设置u盘启动,网上的博客资料中有这样一句话: “7、在Startup栏目中,找到“UEFI/Legacy Boot”选项,设置为“Legacy Only”,并将“CSM Support”设置为“YES”,然后找到“…

Mybatis-plus之新特性,你都用过哪些?

1.lambda方式查询 在使用Mybatis-plus进行查询时,我们正常的操作是创建一个QueryWrapper,然后根据字段去做查询操作(如下图)那么就有一个问题,每个数据库的字段都需要写出来,遇到驼峰字段还需要转换为下划线形式,非常影响开发效率。而官方也考虑到这个问题,后续的版本已…

KingbaseES V8R3集群运维案例之---集群恢复案例

KingbaseES、Kingbasecluster案例说明: KingbaseES V8R3流复制集群在专用机环境下,出现异常问题,恢复过程如下: 问题现象及解决方案: 1、现象一 如下所示,整个集群无法启动,选择其中一个节点作为主节点,数据库服务启动如下所示:解决方案: 将sys_xlog下的xxxxA.histor…