spring webflux 小结

一、WebFlux 简介

WebFlux 是 Spring Framework5.0 中引入的一种新的反应式Web框架。通过Reactor项目实现Reactive Streams规范,完全异步和非阻塞框架。本身不会加快程序执行速度,但在高并发情况下借助异步IO能够以少量而稳定的线程处理更高的吞吐,规避文件IO/网络IO阻塞带来的线程堆积。

1.1 WebFlux 的特性

WebFlux 具有以下特性:

  • 异步非阻塞 - 可以举一个上传例子。相对于 Spring MVC 是同步阻塞IO模型,Spring WebFlux这样处理:线程发现文件数据没传输好,就先做其他事情,当文件准备好时通知线程来处理(这里就是输入非阻塞方式),当接收完并写入磁盘(该步骤也可以采用异步非阻塞方式)完毕后再通知线程来处理响应(这里就是输出非阻塞方式)。

  • 响应式函数编程 - 相对于Java8 Stream 同步、阻塞的Pull模式,Spring Flux 采用Reactor Stream 异步、非阻塞Push模式。书写采用 Java lambda 方式,接近自然语言形式且容易理解。

  • 不拘束于Servlet - 可以运行在传统的Servlet 容器(3.1+版本),还能运行在Netty、Undertow等NIO容器中。

1.2 WebFlux 的设计目标

  • 适用高并发

  • 高吞吐量

  • 可伸缩性

二、Spring WebFlux 组件介绍

2.1 HTTPHandler

一个简单的处理请求和响应的抽象,用来适配不同HTTP服务容器的API。

2.2 WebHandler

一个用于处理业务请求抽象接口,定义了一系列处理行为。相关核心实现类如下;

2.3 DispatcherHandler

请求处理的总控制器,实际工作是由多个可配置的组件来处理。

WebFlux是兼容Spring MVC 基于@Controller,@RequestMapping等注解的编程开发方式的,可以做到平滑切换。

2.4 Functional Endpoints

这是一个轻量级函数编程模型。是基于@Controller,@RequestMapping等注解的编程模型的替代方案,提供一套函数式API 用于创建Router,Handler和Filter。调用处理组件如下:

简单的RouterFuntion 路由注册和业务处理过程:

@Bean
public RouterFunction<ServerResponse> initRouterFunction() {return RouterFunctions.route().GET("/hello/{name}", serverRequest -> {String name = serverRequest.pathVariable("name");return ServerResponse.ok().bodyValue(name);}).build();
}

请求转发处理过程:

@Bean
public RouterFunction<ServerResponse> initRouterFunction() {return RouterFunctions.route().GET("/hello/{name}", serverRequest -> {String name = serverRequest.pathVariable("name");return ServerResponse.ok().bodyValue(name);}).build();
}

2.5 Reactive Stream

这是一个重要的组件,WebFlux 就是利用Reactor 来重写了传统Spring MVC 逻辑。其中Flux和Mono 是Reactor中两个关键概念。掌握了这两个概念才能理解WebFlux工作方式。

Flux和Mono 都实现了Reactor的Publisher接口,属于时间发布者,对消费者提供订阅接口,当有事件发生的时候,Flux或者Mono会通过回调消费者的相应的方法来通知消费者相应的事件。这就是所谓的响应式编程模型。

Mono工作流程图

只会在发送出单个结果后完成。

Flux工作流程图

发送出零个或者多个,可能无限个结果后才完成。

对于流式媒体类型:application/stream+json 或者 text/event-stream ,可以让调用端获得服务器滚动结果。
对于非流类型:application/json  WebFlux 默认JSON编码器会将序列化的JSON 一次性刷新到网络,这并不意味着阻塞,因为结果Flux<?> 是以反应式方式写入网络的,没有任何障碍。

三、WebFlux 工作原理

3.1 组件装配过程

流程相关源码解析-WebFluxAutoConfiguration

@Configuration
//条件装配 只有启动的类型是REACTIVE时加载
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
//只有存在 WebFluxConfigurer实例  时加载
@ConditionalOnClass(WebFluxConfigurer.class)
//在不存在  WebFluxConfigurationSupport实例时 加载
@ConditionalOnMissingBean({ WebFluxConfigurationSupport.class })
//在之后装配
@AutoConfigureAfter({ ReactiveWebServerFactoryAutoConfiguration.class,CodecsAutoConfiguration.class, ValidationAutoConfiguration.class })
//自动装配顺序
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class WebFluxAutoConfiguration {@Configuration@EnableConfigurationProperties({ ResourceProperties.class, WebFluxProperties.class })//接口编程 在装配WebFluxConfig 之前要先 装配EnableWebFluxConfiguration@Import({ EnableWebFluxConfiguration.class })public static class WebFluxConfig implements WebFluxConfigurer {//隐藏部分源码/*** Configuration equivalent to {@code @EnableWebFlux}.*/} @Configurationpublic static class EnableWebFluxConfigurationextends DelegatingWebFluxConfiguration {//隐藏部分代码}@Configuration@ConditionalOnEnabledResourceChainstatic class ResourceChainCustomizerConfiguration {//隐藏部分代码}private static class ResourceChainResourceHandlerRegistrationCustomizerimplements ResourceHandlerRegistrationCustomizer {//隐藏部分代码}

WebFluxAutoConfiguration 自动装配时先自动装配EnableWebFluxConfiguration 而EnableWebFluxConfiguration->DelegatingWebFluxConfiguration ->WebFluxConfigurationSupport。

最终WebFluxConfigurationSupport 不仅配置DispatcherHandler 还同时配置了其他很多WebFlux核心组件包括 异常处理器WebExceptionHandler,映射处理器处理器HandlerMapping,请求适配器HandlerAdapter,响应处理器HandlerResultHandler 等。

DispatcherHandler 创建初始化过程如下;

public class WebFluxConfigurationSupport implements ApplicationContextAware {//隐藏部分代码@Nullablepublic final ApplicationContext getApplicationContext() {return this.applicationContext;}//隐藏部分代码@Beanpublic DispatcherHandler webHandler() {return new DispatcherHandler();}

public class DispatcherHandler implements WebHandler, ApplicationContextAware {@Nullableprivate List<HandlerMapping> handlerMappings;@Nullableprivate List<HandlerAdapter> handlerAdapters;@Nullableprivate List<HandlerResultHandler> resultHandlers;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) {initStrategies(applicationContext);}protected void initStrategies(ApplicationContext context) {//注入handlerMappingsMap<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerMapping.class, true, false);ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());AnnotationAwareOrderComparator.sort(mappings);this.handlerMappings = Collections.unmodifiableList(mappings);//注入handlerAdaptersMap<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerAdapter.class, true, false);this.handlerAdapters = new ArrayList<>(adapterBeans.values());AnnotationAwareOrderComparator.sort(this.handlerAdapters);//注入resultHandlersMap<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(context, HandlerResultHandler.class, true, false);this.resultHandlers = new ArrayList<>(beans.values());AnnotationAwareOrderComparator.sort(this.resultHandlers);}

流程相关源码解析-HTTPHandlerAutoConfiguration

上面已讲解过WebFlux 核心组件装载过程,那么这些组件又是什么时候注入到对应的容器上下文中的呢?其实是在刷新容器上下文时注入进去的。

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#onRefresh

public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContextimplements ConfigurableWebServerApplicationContext {@Overrideprotected void onRefresh() {super.onRefresh();try {createWebServer();}catch (Throwable ex) {throw new ApplicationContextException("Unable to start reactive web server", ex);}}private void createWebServer() {WebServerManager serverManager = this.serverManager;if (serverManager == null) {String webServerFactoryBeanName = getWebServerFactoryBeanName();ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();// 这里创建容器管理时注入httpHandlerthis.serverManager = new WebServerManager(this, webServerFactory, this::getHttpHandler, lazyInit);getBeanFactory().registerSingleton("webServerGracefulShutdown",new WebServerGracefulShutdownLifecycle(this.serverManager));// 注册一个 web容器启动服务类,该类继承了SmartLifecyclegetBeanFactory().registerSingleton("webServerStartStop",new WebServerStartStopLifecycle(this.serverManager));}initPropertySources();}protected HttpHandler getHttpHandler() {String[] beanNames = getBeanFactory().getBeanNamesForType(HttpHandler.class);if (beanNames.length == 0) {throw new ApplicationContextException("Unable to start ReactiveWebApplicationContext due to missing HttpHandler bean.");}if (beanNames.length > 1) {throw new ApplicationContextException("Unable to start ReactiveWebApplicationContext due to multiple HttpHandler beans : "+ StringUtils.arrayToCommaDelimitedString(beanNames));}//容器上下文获取httpHandlerreturn getBeanFactory().getBean(beanNames[0], HttpHandler.class);}

而这个HTTPHandler 是由HTTPHandlerAutoConfiguration装配进去的。

@Configuration
@ConditionalOnClass({ DispatcherHandler.class, HttpHandler.class })
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@ConditionalOnMissingBean(HttpHandler.class)
@AutoConfigureAfter({ WebFluxAutoConfiguration.class })
@AutoConfigureOrder(Ordered.HIGHEST_PRECEDENCE + 10)
public class HttpHandlerAutoConfiguration {@Configurationpublic static class AnnotationConfig {private ApplicationContext applicationContext;public AnnotationConfig(ApplicationContext applicationContext) {this.applicationContext = applicationContext;}//构建WebHandler@Beanpublic HttpHandler httpHandler() {return WebHttpHandlerBuilder.applicationContext(this.applicationContext).build();}}

流程相关源码解析-web容器

org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer 。在创建WebServerManager 容器管理器时会获取对应web容器实例,并注入响应的HTTPHandler。

class WebServerManager {private final ReactiveWebServerApplicationContext applicationContext;private final DelayedInitializationHttpHandler handler;private final WebServer webServer;WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {this.applicationContext = applicationContext;Assert.notNull(factory, "Factory must not be null");this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);this.webServer = factory.getWebServer(this.handler);}
}

以Tomcat 容器为例展示创建过程,使用的是 TomcatHTTPHandlerAdapter 来连接Servlet 请求到HTTPHandler组件。

public class TomcatReactiveWebServerFactory extends AbstractReactiveWebServerFactory implements ConfigurableTomcatWebServerFactory {//隐藏部分代码   @Overridepublic WebServer getWebServer(HttpHandler httpHandler) {if (this.disableMBeanRegistry) {Registry.disableRegistry();}Tomcat tomcat = new Tomcat();File baseDir = (this.baseDirectory != null) ? this.baseDirectory : createTempDir("tomcat");tomcat.setBaseDir(baseDir.getAbsolutePath());Connector connector = new Connector(this.protocol);connector.setThrowOnFailure(true);tomcat.getService().addConnector(connector);customizeConnector(connector);tomcat.setConnector(connector);tomcat.getHost().setAutoDeploy(false);configureEngine(tomcat.getEngine());for (Connector additionalConnector : this.additionalTomcatConnectors) {tomcat.getService().addConnector(additionalConnector);}TomcatHttpHandlerAdapter servlet = new TomcatHttpHandlerAdapter(httpHandler);prepareContext(tomcat.getHost(), servlet);return getTomcatWebServer(tomcat);}
}

最后Spring容器加载后通过SmartLifecycle实现类WebServerStartStopLifecycle 来启动Web容器。

WebServerStartStopLifecycle 注册过程详见:org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext#createWebServer

3.2 完整请求处理流程

(引用自:https://blog.csdn.net)

该图给出了一个HTTP请求处理的调用链路。是采用Reactor Stream 方式书写,只有最终调用 subscirbe 才真正执行业务逻辑。基于WebFlux 开发时要避免controller 中存在阻塞逻辑。列举下面例子可以看到Spring MVC 和Spring Webflux 之间的请求处理区别。

@RestControllerpublic
class TestController {private Logger logger = LoggerFactory.getLogger(this.getClass());@GetMapping("sync")public String sync() {logger.info("sync method start");String result = this.execute();logger.info("sync method end");return result;}@GetMapping("async/mono")public Mono<String> asyncMono() {logger.info("async method start");Mono<String> result = Mono.fromSupplier(this::execute);logger.info("async method end");return result;}private String execute() {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}
}

日志输出

2021-05-31 20:14:52.384  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method start
2021-05-31 20:14:57.385  INFO 3508 --- [nio-8080-exec-2] c.v.internet.webflux.web.TestController  : sync method end
2021-05-31 20:15:09.659  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method start
2021-05-31 20:15:09.660  INFO 3508 --- [nio-8080-exec-3] c.v.internet.webflux.web.TestController  : async method end

从上面例子可以看出sync() 方法阻塞了请求,而asyncMono() 没有阻塞请求并立刻返回的。asyncMono() 方法具体业务逻辑 被包裹在了Mono 中Supplier中的了。当execute 处理完业务逻辑后通过回调方式响应给浏览器。

四、存储支持

一旦控制层使用了 Spring Webflux 则安全认证层、数据访问层都必须使用 Reactive API 才真正实现异步非阻塞。

NOSQL Database

  • MongoDB (org.springframework.boot:spring-boot-starter-data-mongodb-reactive)。

  • Redis(org.springframework.boot:spring-boot-starter-data-redis-reactive)。

Relational Database

  • H2 (io.r2dbc:r2dbc-h2)

  • MariaDB (org.mariadb:r2dbc-mariadb)

  • Microsoft SQL Server (io.r2dbc:r2dbc-mssql)

  • MySQL (dev.miku:r2dbc-mysql)

  • jasync-sql MySQL (com.github.jasync-sql:jasync-r2dbc-mysql)

  • Postgres (io.r2dbc:r2dbc-postgresql)

  • Oracle (com.oracle.database.r2dbc:oracle-r2dbc)

五、总结

关于Spring MVC 和Spring WebFlux 测评很多,本文引用下做简单说明。参考:《Spring: Blocking vs non-blocking: R2DBC vs JDBC and WebFlux vs Web MVC》。

基本依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-r2dbc</artifactId> 
</dependency> 
<!-- r2dbc 连接池 --> 
<dependency> <groupId >io.r2dbc</groupId> <artifactId>r2dbc-pool</artifactId> 
</dependency> 
<!--r2dbc mysql 库--> 
<dependency> <groupId>dev.miku</groupId> <artifactId>r2dbc- mysql</artifactId> 
</dependency> 
<!--自动配置需要引入一个嵌入式数据库类型对象--> 
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> 
</dependency>
<!-- 反应方程式 web 框架 webflux--> 
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> 
</dependency>

相同数据下效果如下

Spring MVC + JDBC 在低并发下表现最好,但 WebFlux + R2DBC 在高并发下每个处理请求使用的内存最少。

Spring WebFlux + R2DBC 在高并发下,吞吐量表现优异。



 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/624010.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QT creator 代码中有中文,提示常量中有换行符解决方案

QT creator 代码中有中文&#xff0c;提示常量中有换行符解决方案 参考视频问题问题解决 参考 感谢感谢,非常感谢,有你,让Qt不再困难,困扰我四年的问题解决了!!! https://blog.csdn.net/m0_45866718/article/details/112389513 视频 https://www.bilibili.com/video/BV1Fp4…

Towards Street-Level Client-Independent IP Geolocation(2011年)(第一部分)

被引次数:306 Wang Y, Burgener D, Flores M, et al. Towards {Street-Level}{Client-Independent}{IP} Geolocation[C]//8th USENIX Symposium on Networked Systems Design and Implementation (NSDI 11). 2011. Abstract 一个高度精确的客户端独立的地理定位服务将是互联…

MGRE环境下的ospf实验

MGRE环境下的ospf实验 一.拓扑图 二.实验步骤 1.分配各路由网段IP [R1]int g 0/0/0 [R1-GigabitEthernet0/0/0]ip address 16.0.0.1 24 [R1-GigabitEthernet0/0/0]int g 0/0/1 [R1-GigabitEthernet0/0/1]ip address 116.0.0.1 24[R2]int g 0/0/0 [R2-GigabitEthernet0/0/0]…

【Linux】磁盘阵列RAID技术

目录 一、RAID介绍 1.1 什么是RAID技术&#xff1f; 1.2 为什么要使用RAID技术&#xff1f; 二、RAID级别 2.1 常见的RAID级别 2.2 常见RAID介绍 三、RAID特性对比 一、RAID介绍 1.1 什么是RAID技术&#xff1f; 把多块独立的物理磁盘按不同的方式组合起来形成一个硬盘…

基于Python的景区票务人脸识别系统(V2.0)

博主介绍&#xff1a;✌IT徐师兄、7年大厂程序员经历。全网粉丝15W、csdn博客专家、掘金/华为云//InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;&#x1f3…

【C语言】带你完全理解指针(五)练习

复习一下对数组名的理解 数组名的理解 数组名是数组首元素的地址 但是有2个例外&#xff1a; 1. sizeof(数组名)&#xff0c;这里的数组名表示整个数组&#xff0c;计算的是整个数组的大小&#xff0c;单位是字节 2. &数组名&#xff0c;这里的数组名表示整个数组&#xff…

【应用】SpringBoot-自动配置原理

前言 本文简要介绍SpringBoot的自动配置原理。 本文讲述的SpringBoot版本为&#xff1a;3.1.2。 前置知识 在看原理介绍之前&#xff0c;需要知道Import注解的作用&#xff1a; 可以导入Configuration注解的配置类、声明Bean注解的bean方法&#xff1b;可以导入ImportSele…

艾体宝方案 | ITT-Profitap IOTA——铁路运输的远程网络捕获和故障排除方案

在移动互联时代&#xff0c;铁路运输的数字化转型已成不可逆转的趋势。然而&#xff0c;随之而来的是对网络连接质量和故障排查的更高要求。本文将探讨如何利用艾体宝Profitap IOTA技术&#xff0c;在火车上实现远程网络捕获和故障排查&#xff0c;助力铁路运输行业迈向智能化未…

GPS定位原理及应用分析

一&#xff0e;定位原理 1.卫星定位&#xff08;GPS&#xff0c;北斗导航&#xff09; ①&#xff0e;硬件构成&#xff08;24颗卫星&#xff0c;可构建一套导航系统&#xff09; 为何是24颗卫星&#xff1f; 可以做到全球覆盖&#xff0c;同一地点地球上空可观测到4颗卫星。 …

数据分析(2)

数据分析&#xff08;2&#xff09; 本文介绍pandas的另一种数据类型DataFrame,中文叫数据框 DataFrame 定义&#xff1a; DataFrame是一个二维的矩阵数据表&#xff0c;通过行和列&#xff0c;可以定位一个值。 在某种程度上&#xff0c;可以认为DataFrame是“具有相同ind…

基于python的二手房数据分析建模及可视化研究,爬取链家二手房数据,可视化分析,房价预测模型

介绍 主要涉及通过爬取济南市链家二手房数据&#xff0c;然后对数据进行处理&#xff0c;包括缺省值处理&#xff0c;高德地图获取二手房地址所属市区&#xff0c;经纬度等数据处理。然后通过python的flask框架编写后端接口&#xff0c;把数据响应给前端。然后前端通过AJAX请求…

Xshell无法输入命令输入命令卡顿

Xshell是一款功能强大的终端模拟软件&#xff0c;可以让用户通过SSH、Telnet、Rlogin、SFTP等协议远程连接到Linux、Unix、Windows等服务器。然而&#xff0c;在使用Xshell的过程中&#xff0c;我们可能会遇到一些问题。比如输入不了命令&#xff0c;或者输入命令很卡。这些问题…