异步编程 - 07 基于JDK中的Future实现异步编程(下)_当Stream遇见CompletableFuture

文章目录

  • JDK8 Stream
  • Stream遇见CompletableFuture
  • 小结

在这里插入图片描述


JDK8 Stream

JDK8中提供了流式对数据进行处理的功能,它的出现允许我们以声明式方式对数据集合进行处理。所谓声明式是相对于我们平时所用的命令式编程来说的,使用声明式编程会让我们对业务的表达更清晰。另外使用流可以让我们很方便地对数据集进行并行处理。

比如下面的代码,我们从person列表中过滤出年龄大于10岁的人,并且收集对应的name字段到list,然后统一打印处理。在使用非Stream的情况下,我们会使用如下代码来实现。

public static List<Person> makeList() {List<Person> personList = new ArrayList<Person>();Person p1 = new Person();p1.setAge(10);p1.setName("zlx");personList.add(p1);p1 = new Person();p1.setAge(12);p1.setName("jiaduo");personList.add(p1);p1 = new Person();p1.setAge(5);p1.setName("ruoran");personList.add(p1);return personList;
}public static void noStream(List<Person> personList) {List<String> nameList = new ArrayList<>();for (Person person : personList) {if (person.age >= 10) {nameList.add(person.getName());}}for(String name: nameList) {System.out.println(name);}}public static void main(String[] args) {List<Person> personList = makeList();noStream(personList);}

从上述代码可知,noStream方法是典型的命令式编码,我们用for循环来一个个判断当前person对象中的age字段值是否大于等于10,如果是则把当前对象的name字段放到手动创建的nameList列表中,然后再开启新的for循环逐个遍历nameList中的name字段。

下面我们使用Stream方式来修改上面的代码。

public static void useStream(List<Person> personList) {List<String> nameList = personList.stream().filter(person -> person.getAge() >= 10)// 1.过滤大于等于10的age字段值.map(person -> person.getName())// 2.使用map映射元素.collect(Collectors.toList());// 3.收集映射后元素nameList.stream().forEach(name -> System.out.println(name));
}

在上面的代码中我们首先从personList获取到流对象,然后在其上进行了filter运算,过滤出年龄大于等于10的person,然后运用map方法映射person对象到name字段,再使用collect方法收集所有的name字段到nameList,最后从nameList上获取流并调用forEach进行打印。

上面的代码就是声明式编程,其可读性很强,代码直接可以说明想要什么(从代码就可以知道我们要过滤出年龄大于等于10岁的人,并且把满足条件的person的name字段收集起来,然后打印)。

需要注意的是,这里的filter和map操作是中间操作符,也就是当我们在流上施加这些操作时并不会真的被执行。而collect操作是终端操作符,当在流上执行终端操作符时,流上施加的操作才会执行。

JDK8中对于Steam提供了很多操作符,只是简单的列出了filter、map、collect这几种方法。


Stream遇见CompletableFuture

下面我们来看看当Stream与CompletableFuture相结合时会产生什么样的火花。

首先我们来看一个需求,这个需求是消费端对服务提供方集群中的某个服务进行广播调用(轮询调用同一个服务的不同提供者的机器),正常同步调用代码如下所示。

public class StreamTestFuture {public static String rpcCall(String ip, String param) {System.out.println(ip + " rpcCall:" + param);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return param;}public static void main(String[] args) {// 1.生成ip列表List<String> ipList = new ArrayList<String>();for (int i = 1; i <=10; ++i) {ipList.add("192.168.0." + i);}// 2.发起广播调用long start = System.currentTimeMillis();List<String> result = new ArrayList<>();for (String ip : ipList) {result.add(rpcCall(ip, ip));}// 3.输出result.stream().forEach(r -> System.out.println(r));System.out.println("cost:" + (System.currentTimeMillis() - start));}
  • 代码1生成ip列表,这代表了所有服务提供者的机器ip。

  • 代码2轮询每个ip,使用ip作为参数调用rpcCall方法(这里面使用休眠1s来模拟远程rpc过程执行)并且把结果保存到result中。

  • 代码3则等所有服务调用完成后打印执行结果,运行上面代码时会发现耗时大概为10s,这是因为代码2发起广播调用是顺序的,也就是当上次rpc调用返回结果后才会进行下一次调用。

下面我们借用Stream和CompletableFuture来看看业务线程如何并发地发起多次rpc请求,从而缩短整个处理流程的耗时。

   // 1.生成ip列表List<String> ipList = new ArrayList<String>();for (int i = 1; i <= 10; ++i) {ipList.add("192.168.0." + i);}// 2.并发调用long start = System.currentTimeMillis();List<CompletableFuture<String>> futureList = ipList.stream().map(ip -> CompletableFuture.supplyAsync(() -> rpcCall(ip, ip)))//同步转换为异步.collect(Collectors.toList());//收集结果//3.等待所有异步任务执行完毕List<String> resultList = futureList.stream().map(future -> future.join())//同步等待结果.collect(Collectors.toList());//对结果进行收集// 4.输出resultList.stream().forEach(r -> System.out.println(r));System.out.println("cost:" + (System.currentTimeMillis() - start));
  • 代码2从ipList处获取了stream,然后通过map操作符把ip转换为远程调用。

  • 注意,这里通过使用CompletableFuture.supplyAsync方法把rpc的同步调用转换为了异步,也就是把同步调用结果转换为了CompletableFuture对象,所以操作符map返回的是一个CompletableFuture,然后collect操作把所有的CompletableFuture对象收集为list后返回。

  • 此外,这里多个rpc调用时是并发执行的,不是顺序执行,因为CompletableFuture.supplyAsync方法把rpc的同步调用转换为了异步。

  • 代码3从futureList获取流,然后使用map操作符把future对象转换为future的执行结果,这里是使用future的join方法来阻塞获取每个异步任务执行完毕,然后返回执行结果,最后使用collect操作把所有的结果收集到resultList。

  • 代码4从resultList获取流,然后打印结果。

  • 运行上面的代码会发现耗时大大减少了,这可以证明上面10个rpc调用时是并发运行的,并不是串行执行。

注意:具体这10个rpc请求是否全部并发运行取决于CompletableFuture内线程池内线程的个数,如果你的机器是单核的或者线程池内线程个数为1,那么这10个任务还是会顺序执行的。


小结

我们了解了CompletableFuture如何解决其缺点,以及CompletableFuture与JDK Stream是如何完美结合的,可知使用CompletableFuture实现异步编程属于声明式编程,一般情况下不需要我们显式创建线程池并提交任务到线程池,这大大减轻了编程者的负担。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/101485.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

springboot之一:配置文件(内外部配置优先顺序+properties、xml、yaml基础语法+profile动态切换配置、激活方式)

配置的概念&#xff1a; Spring Boot是基于约定的&#xff0c;所以很多配置都有默认值&#xff0c;但如果想使用自己的配置替换默认配置的话&#xff0c;就可以使用application.properties或者application.yml(application.yaml)进行配置。 注意配置文件的命名必须是applicat…

百望云亮相服贸会 重磅发布业财税融Copilot

小望小望&#xff0c;我要一杯拿铁&#xff01; 好的&#xff0c;已下单成功&#xff0c;请问要开具发票嘛&#xff1f; 在获得确认的指令后&#xff0c; 百小望AI智能助手 按用户要求成功开具了一张电子发票&#xff01; 这是2023年服贸会国家会议中心成果发布现场&#x…

接入 NVIDIA A100、吞吐量提高 10 倍!Milvus GPU 版本使用指南

Milvus 2.3 正式支持 NVIDIA A100&#xff01; 作为为数不多的支持 GPU 的向量数据库产品&#xff0c;Milvus 2.3 在吞吐量和低延迟方面都带来了显著的变化&#xff0c;尤其是与此前的 CPU 版本相比&#xff0c;不仅吞吐量提高了 10 倍&#xff0c;还能将延迟控制在极低的水准。…

基于SpringBoot+VUE的考试题库刷题系统

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取项目下载方式&#x1f345; 一、项目背景介绍&#xff1a; 在现代教育领域&#…

SQL注入案例

目录 一、简介 二、案例 1.发现注入点 2.寻找注入类型 3.寻找字段数 4.将传参值设为超出数据量的大值&#xff0c;联合查询找到回显位置 5.找到数据库 6.寻找库中的表 7.寻找表中列 8.查看表中数据 附&#xff1a;SQLMap注入 1.输入指令查数据库 2.输入指令查表 3…

iOS 16.4更新指南:问题解答与新功能一览

我应该更新到iOS 16.4吗&#xff1f;这是许多iPhone用户在新更新可用时问自己的一个常见问题。最新的iOS版本提供了各种功能和改进&#xff0c;因此更新的诱惑力很大。 但是&#xff0c;在更新之前&#xff0c;你应该考虑几个因素&#xff0c;以确保安装过程顺利成功。这些因素…

表面之下:理解低代码代理世界中低佣金的经济学

低代码市场在中国自2019年左右兴起&#xff0c;至今已近五年。从最初的质疑&#xff0c;到如今的广泛应用&#xff0c;其业务价值已得到市场普遍认可。根据爱分析测算&#xff0c;2023年中国低代码市场规模为50.2亿元人民币&#xff0c;年增速为39.9%。低代码市场在满足企业需求…

【仿写spring之ioc篇】四、实现bean的初始化阶段

BeanPostProcessor 在Bean的初始化阶段有前置和后置方法&#xff0c;这个方法是通过BeanPostProcessor来管理的&#xff0c;下面我们对原有的项目结构做小小的更改。 对启动类作出修改&#xff0c;先检查有没有BeanPostProcessor的实现类&#xff0c;有的话就使用&#xff0c…

postgre 12.11单实例安装文档

一 下载 访问https://www.postgresql.org/download/&#xff0c;点击左侧的‘source进行下载&#xff0c;一般选择bz2的安装包。 二 安装 这里安装12.11版本的postgre&#xff0c;数据目录路径为/data/server/pgdata&#xff0c;端口为5432. 2.1 安装依赖包 #安装 yum in…

Redis图文指南

1、什么是 Redis&#xff1f; Redis&#xff08;REmote DIctionary Service&#xff09;是一个开源的键值对数据库服务器。 Redis 更准确的描述是一个数据结构服务器。Redis 的这种特殊性质让它在开发人员中很受欢迎。 Redis不是通过迭代或者排序方式处理数据&#xff0c;而是…

Jmeter系列-Jmeter面板介绍和常用配置(2)

Jmeter面板介绍 常用菜单栏 分布式运行相关的 选项&#xff0c;可以打开日志&#xff0c;修改语言、函数助手对话框&#xff0c;还有管理插件 常用的图标 从左到右依次 新建测试计划选择测试计划模板创建一个新的测试计划打开jmeter脚本保存jmeter脚本剪切复制粘贴展开目录…

IP地址、子网掩码、网络地址、广播地址、IP网段

文章目录 IP地址IP地址分类子网掩码网络地址广播地址IP网段 本文主要讨论iPv4地址。 IP地址 实际的 IP 地址是一串32 比特的数字&#xff0c;按照 8 比特&#xff08;1 字节&#xff09;为一组分成 4 组&#xff0c;分别用十进制表示然后再用圆点隔开&#xff0c;这就是我们平…