Netty—FuturePromise

Netty—Future&Promise

  • 一、JDK原生 Future
  • 二、Netty包下的 Future
  • 三、Promise
    • 1、使用Promise同步获取结果
    • 2、使用Promise异步获取结果
    • .3、使用Promise同步获取异常 - sync & get
    • 4、使用Promise同步获取异常 - await
    • 5、使用Promise异步获取异常

在异步处理时,经常用到这两个接口

首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展。

在这里插入图片描述

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果;
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束;
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。
功能/名称jdk Futurenetty FuturePromise
cancel取消任务--
isCanceled任务是否取消--
isDone任务是否完成,不能区分成功失败--
get获取任务结果,阻塞等待--
getNow-获取任务结果,非阻塞,还未产生结果时返回 null-
await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断-
sync-等待任务结束,如果任务失败,抛出异常-
isSuccess-判断任务是否成功-
isCancellable-是否可以取消执行
cause-获取失败信息,非阻塞,如果没有失败,返回null-
addLinstener-添加回调,异步接收结果-
removeListener-删除回调,异步接收结果
setSuccess--设置成功结果
setFailure--设置失败结果

一、JDK原生 Future

关于 java.util.concurrent包下的Future 接口,我想大家应该都很熟悉,用得最多的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果,我们最常用它的 isDone()get() 方法。

// 尝试取消执行此任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否在正常执行完成之前取消
boolean isCancelled();
// 任务是否完成,完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true
boolean isDone();
// 阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
// 阻塞获取执行结果,指定超时时间
V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

接下来,演示一下使用jdk原生Future获取执行结果~

@Slf4j
public class JdkFutureTest01 {public static void main(String[] args) {// 线程池ExecutorService service = newFixedThreadPool(2);// 提交任务Future<Object> future = service.submit(new Callable<Object>() {@Overridepublic Object call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 50;}});try {System.out.println(future.get());service.shutdownNow();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}
}

二、Netty包下的 Future

原生的Future功能比较有限,Netty扩展了Future并增加了以下方法:

// 判断任务是否成功
boolean isSuccess();
// 判断是否可以取消执行
boolean isCancellable();
// 获取失败的信息
Throwable cause();
// 添加回调方法,异步接收结果
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
// 添加多个回调方法
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 删除回调方法,异步接收结果
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
// 删除多个回调方法
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待任务结束,如果任务失败,抛出异常
Future<V> sync() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> syncUninterruptibly();
// 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
Future<V> await() throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
Future<V> awaitUninterruptibly();
// 等待该future在指定的时间限制内完成。
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
// 等待该future在指定的时间限制内完成。
boolean await(long timeoutMillis) throws InterruptedException;
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
// 同上,区别是不可中断阻塞等待过程
boolean awaitUninterruptibly(long timeoutMillis);
// 获取任务结果,非阻塞,还未产生结果时返回 null
V getNow();

通过以上扩展的方法我们可以发现,Netty的Future增加了 sync()await() 方法用于阻塞等待,还提供了 addListener() 方法用于添加回调方法,异步接收结果。

sync() 方法内部会先调用 await() 方法,等待 await() 方法返回后,会检查该任务是否失败,如果失败则将失败的异常抛出来。即使用await()方法等待任务结束,如果任务失败,不会抛异常,而是需要通过 isSuccess 判断。然而 sync() 方法是直接抛出异常!

@Override
public Promise<V> sync() throws InterruptedException {await();rethrowIfFailed();return this;
}
private void rethrowIfFailed() {Throwable cause = cause();if (cause == null) {return;}PlatformDependent.throwException(cause);
}

接下来,演示一下使用Netty包下的Future获取执行结果~

@Slf4j
public class NettyFutureTest01 {public static void main(String[] args) throws InterruptedException {NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop eventLoop = eventLoopGroup.next();Future<Integer> future = eventLoop.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {log.info("执行计算");Thread.sleep(1000);return 66;}});// 阻塞等待future.sync();log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
}

又或者使用 addListener() 方法用于添加回调方法,异步接收结果。

future.addListener(new GenericFutureListener<Future<? super Integer>>() {@Overridepublic void operationComplete(Future<? super Integer> future) throws Exception {log.info("收到结果{}", future.getNow());eventLoopGroup.shutdownGracefully();}
});

三、Promise

Future支持阻塞等待、添加回调方法、判断执行状态等,而Promise主要是支持状态设置相关方法。当底层I/O操作通过Promise改变执行状态,我们可以通过同步等待的Future立即得到结果。

// 设置成功结果并回调
Promise<V> setSuccess(V result);
// 同上,区别是是否报错
boolean trySuccess(V result);
// 设置失败异常并回调
Promise<V> setFailure(Throwable cause);
// 同上,区别是是否报错
boolean tryFailure(Throwable cause);
// 设置为不可取消状态
boolean setUncancellable();

可见,Promise作为一个特殊的Future,只是增加了一些状态设置方法。所以它常用于传入I/O业务代码中,用于I/O结束后设置成功(或失败)状态,并回调方法。以下是DefaultPromise的继承关系:
在这里插入图片描述

设置promise的状态其实就是原子地修改result字段为传入的执行结果。值得注意的是,result字段带有volatile关键字来确保多线程之间的可见性。另外,设置完毕状态后,会尝试唤醒所有在阻塞等待该promise返回结果的线程。

// result 字段的原子更新器
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
// 缓存执行结果的字段
private volatile Object result;
// Promise所在的线程
private final EventExecutor executor;
// 一个或多个回调方法
private Object listeners;
// 阻塞线程数量计数器
private short waiters;@Override
public Promise<V> setSuccess(V result) {if (setSuccess0(result)) {return this;}throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {// 原子修改result字段为 objResultif (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {if (checkNotifyWaiters()) {notifyListeners();}return true;}return false;
}
private synchronized boolean checkNotifyWaiters() {if (waiters > 0) {// 唤醒其他等待线程notifyAll();}return listeners != null;
}

1、使用Promise同步获取结果

@Slf4j
public class PromiseDemo01 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}log.info("set success");promise.setSuccess(10);});log.info("start...");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

2、使用Promise异步获取结果

@Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

.3、使用Promise同步获取异常 - sync & get

Slf4j
public class PromiseDemo03 {public static void main(String[] args) throws ExecutionException, InterruptedException {DefaultEventLoop eventLoop = new DefaultEventLoop();DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.debug("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());log.info("promise.get():{}" , promise.get());}
}

4、使用Promise同步获取异常 - await

@Slf4j
public class PromiseDemo04 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());promise.await();if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}}
}

5、使用Promise异步获取异常

@Slf4j
public class PromiseDemo05 {public static void main(String[] args) throws InterruptedException, ExecutionException {DefaultEventLoop eventLoop = new DefaultEventLoop();Promise<Integer> promise = new DefaultPromise<>(eventLoop);promise.addListener(future -> {if (promise.isSuccess()) {log.info("{}", promise.getNow());} else {log.error("{}", promise.cause().toString());}});eventLoop.execute(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}RuntimeException exception = new RuntimeException("error....hh");log.info("set failure,e: {}", exception.getMessage());promise.setFailure(exception);});log.info("start");log.info("promise.getNow():{}" , promise.getNow());}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/98694.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

公信力不是儿戏:政府与非营利组织如何利用爱校对提升信息质量

公信力是政府和非营利组织成功的基础&#xff0c;而这种公信力大多来源于对外发布的信息的准确性和可靠性。在这个方面&#xff0c;“爱校对”展现了它的强大能力和实用性。以下我们将具体探讨这两种组织如何通过使用爱校对来提升他们的信息质量。 政府&#xff1a;公开与透明&…

基于Spring Boot的企业门户网站设计与实现(Java+spring boot+MySQL)

获取源码或者论文请私信博主 演示视频&#xff1a; 基于Spring Boot的企业门户网站设计与实现&#xff08;Javaspring bootMySQL&#xff09; 使用技术&#xff1a; 前端&#xff1a;html css javascript jQuery ajax thymeleaf 微信小程序 后端&#xff1a;Java springboot…

【TypeScript学习】—面向对象(四)

【TypeScript学习】—面向对象&#xff08;四&#xff09; 一、面向对象 二、类 三、构造方法 class Dog{name:string;age:number;//构造函数constructor(name:string,age:number){this.namename;this.ageage;}bark(){//在方法中可以通过this来表示当前调用方法的对象//this表…

使用命令行创建仓库

如果你还没有任何代码&#xff0c;可以通过命令行工具创建一个全新的Git仓库并初始化到本项目仓库中。 git clone https://e.coding.net/***/neurosens.git cd neurosens echo "# neurosens" >> README.md git add README.md git commit -m "first commi…

简单的爬虫代码 爬(豆瓣电影)

路漫漫其修远兮&#xff0c;吾将上下而求索 这次写一个最简单的python爬虫代码&#xff0c;也是大多教程第一次爬取的&#xff0c;代码里面有个别的简单介绍&#xff0c;希望能加深您对python爬虫的理解。 本次爬取两个网页数据 一 爬取的网站 豆瓣电影 爬取网页中的&#…

windows查看端口占用,通过端口找进程号(查找进程号),通过进程号定位应用名(查找应用)(netstat、tasklist)

文章目录 通过端口号查看进程号netstat通过进程号定位应用程序tasklist 通过端口号查看进程号netstat 在Windows系统中&#xff0c;可以使用 netstat 命令来查看端口的占用情况。以下是具体的步骤&#xff1a; 打开命令提示符&#xff08;CMD&#xff09;&#xff1a;按WinR组…

鲁棒优化入门(6)—Matlab+Yalmip两阶段鲁棒优化通用编程指南(上)

0.引言 上一篇博客介绍了使用Yalmip工具箱求解单阶段鲁棒优化的方法。这篇文章将和大家一起继续研究如何使用Yalmip工具箱求解两阶段鲁棒优化(默认看到这篇博客时已经有一定的基础了&#xff0c;如果没有可以看看我专栏里的其他文章)。关于两阶段鲁棒优化与列与约束生成算法的原…

Mybatis 动态SQL – 使用choose标签动态生成条件语句

之前我们介绍了if,where标签的使用&#xff1b;本篇我们需要在if,where标签的基础上介绍如何使用Mybatis提供的choose标签动态生成条件语句。 如果您对if,where标签动态生成条件语句不太了解&#xff0c;建议您先进行了解后再阅读本篇&#xff0c;可以参考&#xff1a; Mybat…

uniapp从零到一的学习商城实战

涵盖的功能&#xff1a; 安装开发工具HBuilder&#xff1a;HBuilderX-高效极客技巧 创建项目步骤&#xff1a; 1.右键-项目&#xff1a; 2.选择vue2和默认模板&#xff1a; 3.完整的项目目录&#xff1a; 微信开发者工具调试&#xff1a; 1.安装微信开发者工具 2.打开…

MySQL访问和配置

目录 1.使用MySQL自带的客户端工具访问 2.使用DOS访问(命令行窗口WinR → cmd) 3.连接工具&#xff08;SQLyog或其它&#xff09; MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 1.使用MySQL自…

行测图形推理规律(一)元素组成

题库&#xff1a;粉笔网题库 (fenbi.com) 不知道和测评的行测题库是不是一样的&#xff0c;但是总结的规律应该是一样的。 规律并不唯一&#xff0c;题库的答案也只是参考答案&#xff0c;切勿当杠精&#xff0c;你觉得你的规律更合适就别管。本人所归纳的规律仅代表本人想法…

23. 带旋转的数独游戏

题目 Description 数独是一个基于逻辑的组合数字放置拼图&#xff0c;在世界各地都很受欢迎。 在这个问题上&#xff0c;让我们关注 网格的拼图&#xff0c;其中包含 个区域。 目标是用十六进制数字填充整个网格&#xff0c;即 &#xff0c;以便每列&#xff0c;每行和每个区…