ExecutorService、Callable、Future实现有返回结果的多线程原理解析

原创/朱季谦

在并发多线程场景下,存在需要获取各线程的异步执行结果,这时,就可以通过ExecutorService线程池结合Callable、Future来实现。

我们先来写一个简单的例子——

public class ExecutorTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executor = Executors.newSingleThreadExecutor();Callable callable = new MyCallable();Future future = executor.submit(callable);System.out.println("打印线程池返回值:" + future.get());}
}class MyCallable implements Callable<String>{@Overridepublic String call() throws Exception {return "测试返回值";}
}

执行完成后,会打印出以下结果:

打印线程池返回值:测试返回值

可见,线程池执行完异步线程任务,我们是可以获取到异步线程里的返回值。

那么,ExecutorService、Callable、Future实现有返回结果的多线程是如何实现的呢?

首先,我们需要创建一个实现函数式接口Callable的类,该Callable接口只定义了一个被泛型修饰的call方法,这意味着,需要返回什么类型的值可以由具体实现类来定义——

@FunctionalInterface
public interface Callable<V> {V call() throws Exception;
}

因此,我自定义了一个实现Callable接口的类,该类的重写了call方法,我们在执行多线程时希望返回什么样的结果,就可以在该重写的call方法定义。

class MyCallable implements Callable<String>{@Overridepublic String call() throws Exception {return "测试返回值";}
}

在自定义的MyCallable类中,我在call方法里设置一个很简单的String返回值 “测试返回值”,这意味着,我是希望在线程池执行完异步线程任务时,可以返回“测试返回值”这个字符串给我。

接下来,我们就可以创建该MyCallable类的对象,然后通过executor.submit(callable)丢到线程池里,线程池里会利用空闲线程来帮我们执行一个异步线程任务。

ExecutorService executor = Executors.newSingleThreadExecutor();
Callable callable = new MyCallable();
Future future = executor.submit(callable);

值得注意一点是,若需要实现获取线程返回值的效果,只能通过executor.submit(callable)去执行,而不能通过executor.execute(Runnable command)执行,因为executor.execute(Runnable command)只能传入实现Runnable 接口的对象,但这类对象是不具备返回线程效果的功能。

进入到executor.submit(callable)底层,具体实现在AbstractExecutorService类中。可以看到,执行到submit方法内部时,会将我们传进来的new MyCallable()对象作为参数传入到newTaskFor(task)方法里——

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}

这个newTaskFor(task)方法内部具体实现,是将new MyCallable()对象传入构造器中,生成了一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);
}

这个FutureTask对象实现RunableFuture接口,这个RunableFuture接口又继承了Runnable,说明FutureTask类内部会实现一个run方法,然后本身就可以当做一个Runnable线程任务,借助线程Thread(new FutureTask(.....)).start()方式开启一个新线程,去异步执行其内部实现的run方法逻辑。

public class FutureTask<V> implements RunnableFuture<V>{.....}public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}

分析到这里,可以知道FutureTask的核心方法一定是run方法,线程执行start方法后,最后会去调用FutureTask的run方法。在讲解这个run方法前,我们先去看一下创建FutureTask的初始化构造方法底层逻辑new FutureTask(callable) ——

public class FutureTask<V> implements RunnableFuture<V> {private Callable<V> callable;......//省略其余源码public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();//通过构造方法初始化Callable<V> callable赋值this.callable = callable;this.state = NEW;       // ensure visibility of callable
}......//省略其余源码
}

可以看到,FutureTask(Callable callable)构造器,主要是将我们先前创建的new MyCallable()对象传进来,赋值给FutureTask内部定义的Callable callable引用,实现子类对象指向父类引用。这一点很关键,这就意味着,在初始化创建FutureTask对象后,我们是可以通过callable.call()来调用我们自定义设置可以返回“测试返回值”的call方法,这不就是我们希望在异步线程执行完后能够返回的值吗?

我们不妨猜测一下整体返数主流程,在Thread(new FutureTask(.....)).start()开启一个线程后,当线程获得了CPU时间片,就会去执行FutureTask对象里的run方法,这时run方法里可以通过callable.call()调用到我们自定义的MyCallable#call()方法,进而得到方法返回值 “测试返回值”——到这一步,只需要将这个返回值赋值给FutureTask里某个定义的对象属性,那么,在主线程在通过获取FutureTask里被赋值的X对象属性值,不就可以拿到返回字符串值 “测试返回值”了吗?

实现上,主体流程确实是这样,只不过忽略了一些细节而已。

image

接下来,让我们看一下FutureTask的run方法——

public void run() {//如果状态不是NEW或者设置runner为当前线程时,说明FutureTask任务已经取消,无法继续执行if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {//在该文中,callable被赋值为指向我们定义的new MyCallable()对象引用Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//c.call最后会调用new MyCallable()的call()方法,得到字符串返回值“测试返回值”给resultresult = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}//正常执行完c.call()方法时,ran值为true,说明会执行set(result)方法。if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}

根据以上源码简单分析,可以看到run方法当中,最终确实会执行new MyCallable()的call()方法,得到字符串返回值“测试返回值”给result,然后执行set(result)方法,根据set方法名就不难猜出,这是一个会赋值给某个字段的方法。

这里分析会忽略一些状态值的讲解,这块会包括线程的取消、终止等内容,后面我会出一片专门针对FutureTask源码分析的文章再介绍,本文主要还是介绍异步线程返回结果的主要原理。

沿着以上分析,追踪至set(result)方法里——

protected void set(V v) {//通过CAS原子操作,将运行的线程设置为COMPLETING,说明线程已经执行完成中if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//若CAS原子比较赋值成功,说明线程可以被正常执行完成的话,然后将result结果值赋值给outcomeoutcome = v;//线程正常完成结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

这个方法的主要是,若该线程执行能够正常完成话,就将得到的返回值赋值给outcome,这个outcome是FutureTask的一个Object变量——

private Object outcome;

至此,就完成了流程的这一步——

image

最后,就是执行主线程的根据ftask.get()获取执行完成的值,这个get可以设置超时时间,例如 ftask.get(2,TimeUnit.SECONDS)表示超过2秒还没有获取到线程返回值的话,就直接结束该get方法,继续主线程往下执行。

System.out.println("打印线程池返回值:" + ftask.get(2,TimeUnit.SECONDS));

进入到get方法,可以看到当状态在s <= COMPLETING时,表示任务还没有执行完,就会去执行awaitDone(false, 0L)方法,这个方法表示,将一直做死循环等待线程执行完成,才会跳出等待循环继续往下走。若设置了超时时间,例如ftask.get(2,TimeUnit.SECONDS)),就会在awaitDone方法循环至2秒,在2秒内发现线程状态被设置为正常完成时,就会跳出循环,若2秒后线程没有执行完成,也会强制跳出循环了,但这种情况将无法获取到线程结果值。

public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)//循环等待线程执行状态s = awaitDone(false, 0L);return report(s);
}

最后就是report(s)方法,可以看到outcome值最终赋值给Object x,若s==NORMAL表示线程任务已经正常完成结束,就可以根据我们定义的类型进行泛型转换返回,我们定义的是String字符串类型,故而会返回字符串值,也就是 “测试返回值”。

private V report(int s) throws ExecutionException {Object x = outcome;if (s == NORMAL)//返回线程任务执行结果return (V)x;if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);
}

你看,最后就能获取到了异步线程执行的结果返回给main主线程——

image

以上就是执行线程任务run方法后,如何将线程任务结果返回给主线程,其实,还少一个地方补充,就是如何将FutureTask任务丢给线程执行,我们这里用到了线程池, 但是execute(ftask)底层同样是使用一个了线程通过执行start方法开启一个线程,这个新运行的线程最终会执行FutureTask的run方法。

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}

可以简单优化下,直接用一个线程演示该案例,这样看着更好理解些,当时,生产上是不会有这样直接用一个线程来执行的,更多是通过原生线程池——

public static void main(String[] args) throws Exception{Callable callable = new MyCallable();RunnableFuture<String> ftask = new FutureTask<String>(callable);new Thread(ftask).start();System.out.println("打印线程池返回值:" + ftask.get());
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/257029.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【mysql】下一行减去上一行数据、自增序列场景应用

背景 想获取if_yc为1连续账期数据 思路 获取所有if_yc为1的账期数据下一行减去上一行账期&#xff0c;如果为1则为连续&#xff0c;不等于1就为断档获取不等于1的最小账期&#xff0c;就是离当前账期最近连续账期 代码 以下为mysql语法&#xff1a; select acct_month f…

Vue配置代理解决跨域

Network的status中报CORS error指在前端&#xff08;Vue.js&#xff09;发起跨域请求时&#xff0c;被服务器拒绝访问的错误 在本地开发环境中&#xff0c;Vue.js 将默认从 http://localhost:8080 启动服务器。如果浏览器访问服务器时使用的 URL 不是该地址&#xff0c;就可能…

HarmonyOS 开发基础(二)Image

HarmonyOS 开发基础&#xff08;二&#xff09;Image Entry Component struct Index {// 创建一个状态变量 img 存储 img 网络地址State img: string https://img1.baidu.com/it/u4049022245,514596079&fm253&app138&sizew931&n0&fJPEG&fmtauto?sec1…

什么是类加载器?什么是双亲委派模型?

什么是类加载器&#xff0c;类加载器有哪些? 要想理解类加载器的话&#xff0c;务必要先清楚对于一个Java文件&#xff0c;它从编译到执行的整个过程。 类加载器&#xff1a;用于装载字节码文件(.class文件) 运行时数据区&#xff1a;用于分配存储空间 执行引擎&#xff1a;执…

V2X全方位通信部署产品支持智能交通建设!

来源&#xff1a;德思特测试测量丨德思特案例 | V2X全方位通信部署产品支持智能交通建设&#xff01; 原文链接&#xff1a;https://mp.weixin.qq.com/s/Fhnvcq9HA60Sed5BIGcnSw 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 01 案例背景 后疫情时代人们更注重于享…

USB总线驱动(二)设备驱动ch341以及serial

我们以一个usb设备&#xff08;ch341&#xff09;驱动例子来看下用法。 一、ch341设备驱动 如上&#xff0c;注册了一个ch341的驱动&#xff0c;根据指定的产品号和设备号&#xff0c;这个驱动将会适配3个类型的usb转串口设备。 module_usb_serial_driver最终转开成 标准的驱动…

正则表达式详细讲解

目录 一、正则表达式概念 二、八元素 1、普通字符&#xff1a; 2、元字符&#xff1a; 3、通配符 .&#xff1a; 4、字符类 []&#xff1a; 5、量词&#xff1a; 6、锚点 ^ 和 $&#xff1a; 7、捕获组 ()&#xff1a; 8、转义字符 \&#xff1a; 三、日常使用的正则…

算法 最小生成树

算法选择 稠密图&#xff1a;朴素版普利姆算法【因为代码短】 稀疏图&#xff1a;克鲁斯卡尔算法【因为思路简单】 普利姆&#xff08;Prim&#xff09; 朴素 Prim 时间复杂度 O(n^2) 适用情况 稠密图 算法流程 集合&#xff1a;当前已经在连通块中的所有点 初始化距…

水平自动扩容和缩容HPA;API资源对象NetworkPolicy;Kubernetes用户安全控制;Kubernetes创建普通用户示例

水平自动扩容和缩容HPA&#xff1b;API资源对象NetworkPolicy&#xff1b;Kubernetes用户安全控制&#xff1b;Kubernetes创建普通用户示例 水平自动扩容和缩容HPA&#xff08;本部分操作适合K8s版本>1.23.x) HPA全称是Horizontal Pod Autoscaler&#xff0c;翻译成中文是…

Windows 系统彻底卸载 SQL Server 通用方法

Windows 系统彻底卸载 SQL Server 通用方法 无论什么时候&#xff0c;SQL Server 的安装和卸载都是一件让我们头疼的事情。因为不管是 SQL Server 还是 MySQL 的数据库&#xff0c;当我们在使用数据库时因为未知原因出现问题&#xff0c;想要卸载重装时&#xff0c;如果数据库…

[LeetCode]-283. 移动零-1089. 复写零

目录 283. 移动零 描述 解析 代码 1089. 复写零 描述 解析 代码 283. 移动零 283. 移动零https://leetcode.cn/problems/move-zeroes/ 描述 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &…

【Linux--基础IO】

目录 一、系统文件接口1.1 open1.2 write1.3 read1.4 close 二、文件描述符三、文件描述符的分配规则四、重定向4.1输出重定向的原理4.2dup2函数的系统调用 五、缓冲区5.1代码及现象5.2原理解释5.3C语言FILE 六、文件系统6.1磁盘的介绍6.1磁盘的分区管理 7、软硬连接7.1软连接7…