CompletableFuture 异步编排

目录

  • CompletableFuture 的详解
  • 代码测试
  • 配置类的引入
  • Demo1
  • Demo2
  • CompletableFuture的async后缀函数与不带async的函数的区别
  • ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的区别
  • Spring 线程池的使用
  • 业务使用多线程的原因
  • 场景一:
  • 场景二:
  • FutureTask介绍
  • 线程池为什么要使用阻塞队列
  • Spring 常用的线程池的使用
  • 序列
  • 常规使用
  • 异步使用

CompletableFuture 的详解

 

🍫 它就是创建一个异步任务,然后在干什么,可以使用多任务组合

 
  • 创建任务的方法
static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
  • 然后继续上一段的任务(里面包含了串行,AND,OR)

🚩 串行:

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
 

参数解析:

  • thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前任务的返回值。(接收上一阶段任务结果,返回结果)
  • thenAccept方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。 (接收上一阶段任务结果,不返回结果)
  • thenRun方法:不接收上一阶段任务结果,并且无返回值 带有Async默认是异步执行的。这里所谓的异步指的是不在当前线程内执行。

🚡 AND

public <U,V> CompletionStage<V> thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
- 上一阶段任务与other任务均执行结束,接收两个任务的结果,并可获取返回值public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
- 使用上一阶段任务的结果,返回一个新的CompletableFuture实例
 

更多的参数详解: 博客链接

 
  • 多任务组合
 
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

代码测试

 
配置类的引入

yml

thread:pool:corePoolSize: 4maxPoolSize: 8workQueue: 25keepAliveTime: 30
 

config

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
@ConfigurationProperties(prefix = "thread.pool")
public class AsyncConfig{//核心线程数量大小private   int corePoolSize = 4;//线程池最大容纳线程数private   int maxPoolSize =8;//阻塞队列private   int workQueue = 25;//线程空闲后的存活时长private   int keepAliveTime = 30;@Bean("asyncTaskExecutor")public Executor getAsyncExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();//核心线程数threadPoolTaskExecutor.setCorePoolSize(corePoolSize);//最大线程数threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);//等待队列threadPoolTaskExecutor.setQueueCapacity(workQueue);//线程前缀threadPoolTaskExecutor.setThreadNamePrefix("taskExecutor-");//线程池维护线程所允许的空闲时间,单位为秒threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveTime);// 线程池对拒绝任务(无线程可用)的处理策略threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());threadPoolTaskExecutor.initialize();return threadPoolTaskExecutor;}
}

🍫 参数详解: 建议看看下面的线程池对拒绝任务(无线程可用)的处理策略

  1、corePoolSize:核心线程数* 核心线程会一直存活,及时没有任务需要执行* 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理* 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭2、queueCapacity:任务队列容量(阻塞队列)* 当核心线程数达到最大时,新任务会放在队列中排队等待执行3、maxPoolSize:最大线程数* 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务* 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常4、 keepAliveTime:线程空闲时间* 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize* 如果allowCoreThreadTimeout=true,则会直到线程数量=05、allowCoreThreadTimeout:允许核心线程超时6、rejectedExecutionHandler:任务拒绝处理器* 两种情况会拒绝处理任务:- 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务- 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务* 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常* ThreadPoolExecutor类有几个内部实现类来处理这类情况:- AbortPolicy 丢弃任务,抛运行时异常(默认)- CallerRunsPolicy 执行任务- DiscardPolicy 忽视,什么都不会发生- DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务* 实现RejectedExecutionHandler接口,可自定义处理器


 

Demo1
  

image-20221115170031342

 
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {System.out.println("洗水壶");try {Thread.sleep(1000);} catch (InterruptedException ex) {ex.printStackTrace();}return "水壶";}).thenApply(e->{System.out.println("烧水");try {Thread.sleep(5000);} catch (InterruptedException ex) {ex.printStackTrace();}return "热水";});//洗水壶->洗水杯->拿茶叶CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {System.out.println("洗茶壶");try {Thread.sleep(1000);} catch (InterruptedException ex) {ex.printStackTrace();}return "茶壶";}).thenApply(e->{try {Thread.sleep(2000);} catch (InterruptedException ex) {ex.printStackTrace();}System.out.println("洗水杯");return "水杯";}).thenApply(e->{System.out.println("拿茶叶");return "茶叶";});//泡茶CompletableFuture<String> task3 = task1.thenCombine(task2, (a, b) -> {System.out.println("泡茶");return "茶";});String tea = task3.join();System.out.println(tea);
 

🍫 参数解析:

更多的参数详解: 博客链接

Demo2
 
  • 这个测试我就没有写了,自己可以看看

问题:当查询接口较复杂时候,数据的获取都需要远程调用,必然需要花费更多的时间。

假如查询文章详情页面,需要如下标注的时间才能完成:

 


// 1. 查询文章详情 0.5s

// 2. 查询文章博主个人信息 0.5s

// 3. 查询文章评论 1s

// 4. 查询博主相关文章分类 1s

// 5. 相关推荐文章 1s

// ......
上面的描述只是举个例子不要在意这里的查询描述,看实际情况使用,有些相关的查询我们可以拆分接口实现,上面的描述只是为了举例子。
 

@Service
public class ArticleService {@Autowiredprivate ArticleClient articleClient;@Autowiredprivate UserClient userClient;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;public ItemVo load(Long id) {// 1. 查询文章详情 0.5s// 下面的查询需要用到文章对应的发布用户,所以这里需要使用CompletableFuture.supplyAsyncCompletableFuture<ArticleEntity> articleCompletableFuture = CompletableFuture.supplyAsync(() -> {ResponseVo<ArticleEntity> skuEntityResponseVo = this.articleClient.getArticleById(id);ArticleEntity articleEntity = skuEntityResponseVo.getData();if (articleEntity == null) {return null;}itemVo.setId(id);itemVo.setTitle(articleEntity.getTitle());itemVo.setDefaltImage(articleEntity.getDefaultImage());return articleEntity;}, threadPoolExecutor);// 2. 查询文章博主个人信息 0.5s// 这里查询需要依赖文章关联的用户id,所以需要使用articleCompletableFuture.thenAcceptAsync()CompletableFuture<Void> userCompletableFuture = articleCompletableFuture.thenAcceptAsync(articleEntity -> {ResponseVo<UserEntity> categoryResponseVo = this.userClient.queryUserInfoById(articleEntity.getUserId());UserEntity userEntity = categoryResponseVo.getData();itemVo.setUserInfo(userEntity);}, threadPoolExecutor);    // 3. 查询博主相关文章分类 1s// 这里查询需要依赖文章关联的用户id,所以需要使用articleCompletableFuture.thenAcceptAsync()CompletableFuture<Void> userOtherArticleCompletableFuture = articleCompletableFuture.thenAcceptAsync(articleEntity -> {ResponseVo<List<UserAuserOtherArticleEntity>> categoryResponseVo = this.articleClient.queryUserAuserOtherArticleById(articleEntity.getUserId());UserAuserOtherArticleEntity userAuserOtherArticleEntity = categoryResponseVo.getData();itemVo.setUserAuserOtherArticleList(userAuserOtherArticleEntity);}, threadPoolExecutor);// 4. 查询文章评论 1s// 不需要依赖其他请求返回值,可以使用新的异步对象 CompletableFuture.runAsync()CompletableFuture<Void> commentsCompletableFuture =  CompletableFuture.runAsync(() -> {ResponseVo<List<UserArticleCategoryEntity>> userArticleCategoryVo = this.userClient.queryCommentsByArticleId(id);UserArticleCategoryEntity userArticleCategoryEntity = userArticleCategoryVo.getData();itemVo.setUserArticleCategoryList(userArticleCategoryEntity);}, threadPoolExecutor);// 5. 相关推荐文章 1s// 不需要依赖其他请求返回值,可以使用新的异步对象 CompletableFuture.runAsync()CompletableFuture<Void> relatedArticlesCompletableFuture =  CompletableFuture.runAsync(() -> {ResponseVo<List<RelatedArticlesEntity>> userArticleCategoryVo = this.articleClient.queryRelatedArticles(id);UserArticleCategoryEntity userArticleCategoryEntity = userArticleCategoryVo.getData();itemVo.setUserArticleCategoryList(userArticleCategoryEntity);}, threadPoolExecutor);}// 多任务执行组合 CompletableFuture.allOf()CompletableFuture.allOf(articleCompletableFuture, userCompletableFuture, userOtherArticleCompletableFuture,commentsCompletableFuture, relatedArticlesCompletableFuture).join();return itemVo;
}


 

CompletableFuture的async后缀函数与不带async的函数的区别

 

参考链接: 博客链接

 

🍫 结论:

不带async的函数的动作比较复杂f的whenComplete的内容由哪个线程来执行,取决于哪个线程X执行了f.complete()。但是当X线程执行了f.complete()的时候,whenComplete还没有被执行到的时候(就是事件还没有注册的时候),那么X线程就不会去同步执行whenComplete的回调了。这个时候哪个线程执行到了whenComplete的事件注册的时候,就由哪个线程自己来同步执行whenComplete的事件内容。而whenCompleteAsync的场合,就简单很多。一句话就是线程池里面拿一个空的线程或者新启一个线程来执行回调。和执行f.complete的线程以及执行whenCompleteAsync的线程无关。



 

ThreadPoolTaskExecutor 和 ThreadPoolExecutor 的区别

 

参考链接: 博客链接

🍫 结论:

其实最主要的原因很直观:ThreadPoolExecutor是一个不受Spring管理生命周期、参数装配的Java类,而有了ThreadPoolTaskExecutor的封装,线程池才有Spring“内味”。

 

Spring 线程池的使用


 

业务使用多线程的原因

 
  • 目的是面对高并发的时候,提高运行速度

场景一:

一个业务逻辑有很多次的循环,每次循环之间没有影响,比如验证1万条url路径是否存在,正常情况要循环1万次,逐个去验证每一条URL,这样效率会很低,假设验证一条需要1分钟,总共就需要1万分钟,有点恐怖。这时可以用多线程,将1万条URL分成50等份,开50个线程,没个线程只需验证200条,这样所有的线程执行完是远小于1万分钟的。


 

场景二:

需要知道一个任务的执行进度,比如我们常看到的进度条,实现方式可以是在任务中加入一个整型属性变量(这样不同方法可以共享),任务执行一定程度就给变量值加1,另外开一个线程按时间间隔不断去访问这个变量,并反馈给用户。总之使用多线程就是为了充分利用cpu的资源,提高程序执行效率,当你发现一个业务逻辑执行效率特别低,耗时特别长,就可以考虑使用多线程。

问题:不过CPU执行哪个线程的时间和顺序是不确定的,即使设置了线程的优先级,因此使用多线程的风险也是比较大的,会出现很多预料不到的问题,一定要多熟悉概念,多构造不同的场景去测试才能够掌握!

项目中可以通过:

@Order()
设置运行的优先级,数字越小,级别越高


 

FutureTask介绍

 

🍫 参考: 博客链接

线程池为什么要使用阻塞队列

 

阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait 状态,释放 cpu 资源,当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。

使得在线程不至于一直占用cpu资源。(线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞队列也是可以的,不过实现起来比较麻烦而已,有好用的为啥不用呢
 

Spring 常用的线程池的使用

 

序列

Spring 通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用 ThreadPoolTaskExecutor 实现一个基于线程池的TaskExecutor,
还得需要使用 @EnableAsync 开启异步,并通过在需要的异步方法那里使用注解@Async声明是一个异步任务

Spring 已经实现的异常线程池:

  •  
  • - SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
  • - SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
  • - ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
  • - SimpleThreadPoolTaskExecutor:是Quartz的 SimpleThreadPool 的类。线程池同时被quartz和非quartz使用,才需要使用此类
  • - ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对 java.util.concurrent.ThreadPoolExecutor 的包装
     

🍫 扩展:相信大家在 Java 里面也学过 JUC ,里面有 Java 里面的线程池,可以直接去看看 ThreadPoolExecutor

至于为什么有线程池,Spring 为什么还有在自己搞一个,可以自己去探索,Spring 的底层还是 ThreadPoolExecutor ,只是它的生命周期不受它控制。


 

常规使用

 
//    线程池(config里面的Bean)@Autowiredprivate Executor taskExecutor;Callable<ScyTeacher> scy =()-> scyTeacherMapper.selectOne(new LambdaQueryWrapper<ScyTeacher>().eq(ScyTeacher::getUsername,test));
FutureTask<ScyTeacher> commentCallable = new FutureTask<>(scy);
Future<Map> submit = executor.submit(commentCallable);
Map map = submit.get();

异步使用

 

🚩 记得开启异步(配置类添加)

@EnableAsync //开启异步执行
 
package com.melodyjerry.thread;import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;/*** @classname AsyncTaskService* @description 异步任务的执行类*/
@Service
public class AsyncTaskService {@Async //异步方法public void executeAsyncTask(Integer i) {System.out.javaprintln("执行异步任务: "+i);}@Async //异步方法public void executeAsyncTaskPlus(Integer i) {System.out.println("执行异步任务+1: " + (i+1));}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/124093.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nodejs+vue校园跑腿系统elementui

购物车品结算,管理个人中心&#xff0c;订单管理&#xff0c;接单处理&#xff0c;商品维护&#xff0c;用户管理&#xff0c;系统管理等功育食5&#xff09;要求系统运行可靠、性能稳定、界面友好、使用方便。 第三章 系统分析 10 3.1需求分析 10 3.2可行性分析 10 3.2.1技术…

正则表达式 Regular Expression学习

该文章内容为以下视频的学习笔记&#xff1a; 10分钟快速掌握正则表达式_哔哩哔哩_bilibili正则表达式在线测试工具&#xff1a;https://regex101.com/, 视频播放量 441829、弹幕量 1076、点赞数 19330、投硬币枚数 13662、收藏人数 26242、转发人数 2768, 视频作者 奇乐编程学…

Spring MVC 十:异常处理

异常是每一个应用必须要处理的问题。 Spring MVC项目&#xff0c;如果不做任何的异常处理的话&#xff0c;发生异常后&#xff0c;异常堆栈信息会直接抛出到页面。 比如&#xff0c;我们在Controller写一个异常&#xff1a; GetMapping(value"/hello",produces{&qu…

【数据结构与算法】- 数组

数组 1.1 数组的定义1.2 数组的创建1.3 数组在内存中的情况2.1 初始化数组2.2 插入元素2.3 删除元素2.4 读取元素2.5 遍历数组 1.1 数组的定义 数组中的是在内存中是连续存储的&#xff0c;内存是由一个个内存单元组成的&#xff0c;每一个内存单元都有自己的地址&#xff0c;…

优先级队列

优先级队列 一端进&#xff0c;另一端出 按优先级出队 普通队列 一端进&#xff0c;另一端出 FIFO 我们做个约定&#xff1a;数字大的&#xff0c;优先级越高 &#xff0c;优先出队 无序数组实现 要点 入队保持顺序 出队前找到优先级最高的出队&#xff0c;相当于一次选择排序…

Tensorflow2 GPU 安装方法

一、Tensorflow2 GPU 安装方法 1. 首先安装Anaconda3环境2. 在Anaconda Prompt 中安装tensorflow23. 验证GPU是否可以使用4. 错误解决 1. 首先安装Anaconda3环境 https://www.anaconda.com/ 2. 在Anaconda Prompt 中安装tensorflow2 conda update conda conda create -n ten…

Unity把UGUI再World模式下显示到相机最前方

Unity把UGUI再World模式下显示到相机最前方 通过脚本修改Shader 再VR里有时候要把3D的UI显示到相机最前方&#xff0c;加个UI相机会坏事&#xff0c;可以通过修改unity_GUIZTestMode来解决。 测试用例 测试用例如下&#xff1a; 场景包含一个红色的盒子&#xff0c;一个UI…

github搜索技巧

指定语言 language:java 比如我要找用java写的含有blog的内容 搜索项目名称包含关键词的内容 vue in:name 其他如项目描述跟项目文档&#xff0c;如下 组合使用 vue in:name,description,readme 根据Star 或者fork的数量来查找 总结 springboot vue stars:>1000 p…

关于字符拼接

当然&#xff0c;以下是加入了幽默注释的代码和对应的逻辑树&#xff1a; # 提示用户输入input和txt内容&#xff0c;期待用户真有输入 input_text input("请输入input文本&#xff1a;") # 好了&#xff0c;快点输入吧 txt_text input("请输入txt文本&#…

【数据结构】红黑树(C++实现)

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 上一篇博客&#xff1a;【数据…

Android逆向学习(五)app进行动态调试

Android逆向学习&#xff08;五&#xff09;app进行动态调试 一、写在前面 非常抱歉鸽了那么久&#xff0c;前一段时间一直在忙&#xff0c;现在终于结束了&#xff0c;可以继续更新android逆向系列的&#xff0c;这个系列我会尽力做下去&#xff0c;然后如果可以的话我看看能…

华为云HECS云服务器docker环境下安装nginx

前提&#xff1a;有一台华为云服务器。 华为云HECS云服务器&#xff0c;安装docker环境&#xff0c;查看如下文章。 华为云HECS安装docker-CSDN博客 一、拉取镜像 下载最新版Nginx镜像 (其实此命令就等同于 : docker pull nginx:latest ) docker pull nginx查看镜像 dock…