Netty权威指南——基础篇3(AIO编程)备份

1 概述

        NIO 2.0 引入了新的异步通道概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取操作结果。

        1、通过java.util.concurrent.Future类来表示异步操作的结果;

        2、在执行异步操作的时候传入一个java.nio.channels

        CompletionHandler接口的实现类作为操作完成的回调。

        NIO2.0 的异步套接字通道是真正的异步非阻塞的时间驱动I/O(AIO)。它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

2 AIO创建TimeServer

        TimeServerAIO.java 代码如下:

public class TimeServerAIO {public static void main(String[] args) {int port = 8080;AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port);new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start();}
}

        AsyncTimeServerHandler.java 代码如下:

public class AsyncTimeServerHandler implements Runnable{private int port;CountDownLatch latch;AsynchronousServerSocketChannel serverSocketChannel;public AsyncTimeServerHandler(int port){this.port = port;try {serverSocketChannel =  AsynchronousServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port));System.out.println("TimeServer 启动,端口号是:"+port);}catch (IOException e){e.printStackTrace();}}@Overridepublic  void run(){latch = new CountDownLatch(1);doAccept();try {latch.await();}catch (InterruptedException e){e.printStackTrace();}}public void doAccept(){serverSocketChannel.accept(this,new AcceptCompletionHandler());}
}

        重点对AsyncTimeServerHandler.java分析。在构造方法中,首先创建一个异步的服务端通道AsynchronousServerSocketChannel,然后调用它的bind方法绑定监听端口。如果端口合法且没被占用,则绑定成功。

        在线程run方法中,对CountDownLatch对象进行初始化,它的作用是在完成一组正在执行的操作之后,允许当前的线程一直阻塞。在本例中,让线程在此阻塞,防止服务端执行完退出。在实际项目应用中,不需要启动独立的线程来处理AsynchronousServerSocketChannel。这里仅仅是个演示。

        doAccept方法用于接受客户端的连接,由于是异步操作,可以传递一个CompletionHandler<AsynchronousServerSocketChannel,? super A>类型的handler实例接收accept操作成功的通知消息。在本例中,通过AcceptCompletionHandler实例作为handle来接受通知消息。

        AcceptCompletionHandler.java 代码如下

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {@Overridepublic void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {attachment.serverSocketChannel.accept(attachment,this);ByteBuffer buffer = ByteBuffer.allocate(1024);result.read(buffer,buffer,new ReadCompletionHandler(result));}@Overridepublic void failed(Throwable exc, AsyncTimeServerHandler attachment) {exc.printStackTrace();attachment.latch.countDown();}
}

        CompletionHandler.java有两个方法分别如下:

public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) 

        从attachment获取成员变量serverSocketChannel,然后继续调用它的accept方法。既然已经接收客户端成功了,为什么还要再次调用accept方法?原因如下:调AsynchronousSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。因为一个AsynchronousSocketChannel可以接收成千上万个客户端,所有需要继续调用accept方法,接收其他的客户端连接,最终形成一个循环。每当接收一个客户端连接成功之后,再异步接收新的客户端连接。

        链路建立成功之后,服务端需要接收客户端的请求消息,这次创建新的ByteBuffer,预分配1MB的缓冲区,再调用read方法进行异步读操作。

ReadCompletionHandler.java 代码如下
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel channel;public ReadCompletionHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] body = new byte[attachment.remaining()];attachment.get(body);try {String req = new String(body,"UTF-8");System.out.println("TimeServer 接收到:" + body);String currentTime = "query".equalsIgnoreCase(req) ?new Date(System.currentTimeMillis()).toString() : "bad";doWrite(currentTime);}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {this.channel.close();}catch (IOException e){e.printStackTrace();}}private void doWrite(String currentTime){if(currentTime != null && currentTime.trim().length() > 0){byte[] bytes = currentTime.getBytes(StandardCharsets.UTF_8);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {//如果没有发送完成,继续发送if(buffer.hasRemaining()){channel.write(buffer,buffer,this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {channel.close();}catch (IOException e){e.printStackTrace();}}});}}
}

        首先看构造方法。将AsynchronousSocketChannel传递到ReadCompletionHandler中,当做成员变量来使用,主要用于读取半包消息和发送应答。

        completed()方法是读取到消息后的处理。首先对attachment进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数据,然后通过new String方法创建请求消息,对请求消息进行判断,如果是“query”则获取当前系统服务器时间,调用doWrite方法发送给客户端。

3 AIO创建TimeClient

        TimeServerClientAIO.java 代码
public class TimeServerClientAIO {public static void main(String[] args) {int port = 8080;new Thread(new AsyncTimeClientHandler("127.0.0.1"),port);}
}

        通过一个独立的I/O线程创建异步时间服务器客户端Handler。在实际项目中,不需要独立的创建线程对象异步连接对象,因为底层都是通过JDK的系统回调实现的。

        AsyncTimeClientHandler.java 代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;/*** @author liujie* @create 2024-02-27 12:26*/
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler> ,Runnable{private AsynchronousSocketChannel client;private String host;private int port;private CountDownLatch latch;public AsyncTimeClientHandler(String host, int port) {this.host = host;this.port = port;try {client = AsynchronousSocketChannel.open();}catch (IOException e){e.printStackTrace();}}@Overridepublic void run() {latch = new CountDownLatch(1);client.connect(new InetSocketAddress(host,port),this,this);try {latch.await();}catch (InterruptedException e){e.printStackTrace();}try {client.close();}catch (IOException e1){e1.printStackTrace();}}@Overridepublic void completed(Void result, AsyncTimeClientHandler attachment) {byte[] req = "query".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();client.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {if(buffer.hasRemaining()){client.write(buffer,buffer,this);}else{ByteBuffer readBuffer = ByteBuffer.allocate(1024);client.read(readBuffer,readBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String body;try {body = new String(bytes,"UTF-8");System.out.println("现在时间是:" + body);latch.countDown();}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}@Overridepublic void failed(Throwable exc, AsyncTimeClientHandler attachment) {exc.printStackTrace();try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}
}
代码分析
1、构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。
2、run()方法:创建CountDownLatch进行等地啊,防止异步操作没有执行完成线程就退出。通过connect方法发起异步操作,它有两个参数,分别是:

        1、A attachment:AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;

        2、CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。在本例中,这两个参数都是用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。

3、completed()方法。异步连接成功之后的方法回调。创建请求消息体,对其进行编码,然后复制到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写。如果发送缓冲区中仍有尚未发送的字节,将继续异步发送,直到发送完毕,则执行异步读取操作。client.read()方法是客户端异步读取时间服务器应答消息的处理逻辑。首先调用read方法异步读取服务器的响应信息。由于read操作是异步的,所以通过内部匿名类实现CompletionHandler接口,当读取完成被JDK回调时,构造应答消息。

4、failed()方法。当读取发生异常时,关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。

4 I/O对比

        4.1 异步非阻塞I/O

        很多人喜欢把JDK1.4提供的NIO框架称为异步非阻塞I/O,但是,如果严格按照UNIX网络编程模型和JDK的实现进行区分,实际上它只能被称为非阻塞I/O,不能叫异步非阻塞I/O。在早期的JDK 1.4和JDK 1.5中,JDK的Selector基于select/poll模型实现,它是基于I/O复用技术的非阻塞I/O,不是异步I/O。由JDK1.7提供了NIO 2.0新增了异步套接字通道,它是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成之后会回调相关的方法,异步I/O也称为AIO。

        4.2 多路复用器Selector

        在前面的章节介绍过Java NIO的实现关键是多路复用I/O技术,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行I/O操作。

        4.3 伪异步I/O

        在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离I/O线程和业务线程间的直接访问,这样业务线程就不会被I/O线程阻塞。而对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不在直接访问I/O线程或者进行I/O读写,这样也就不会被同步阻塞。

        不同的I/O模型由于线程模型、API等差别很大,所以用法的差异也非常大。下图对它们进行功能对比:

           

        选择什么样的I/O模型或者NIO框架,要基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,周边对接的网元不多,服务器负载也不重,那就完全没必要选择NIO做服务器;如果是相反情况,就要考虑选择合适的NIO框架进行开发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/496113.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

尚硅谷webpack5笔记2

Loader 原理 loader 概念 帮助 webpack 将不同类型的文件转换为 webpack 可识别的模块。 loader 执行顺序 分类pre: 前置 loadernormal: 普通 loaderinline: 内联 loaderpost: 后置 loader执行顺序4 类 loader 的执行优级为:pre > normal > inline > post 。相…

主流开发环境都有哪些?主流开发语言都有什么?

目录 一、简介&#xff1a; 二、主流开发环境&#xff1a; 三、主流开发语言&#xff1a; 四、结论&#xff1a; 一、简介&#xff1a; 在现代软件开发领域&#xff0c;选择适合自己需求的开发环境和开发语言至关重要。本文将介绍目前主流的开发环境和开发语言&#xff0c;…

【推荐算法系列十六】:协同过滤

文章目录 参考原理基于邻域的协同过滤算法基于用户的协同过滤&#xff08;User-Based Collaborative Filtering&#xff09;基于内容的协同过滤 基于模型的协同过滤算法 扩展优缺点 参考 推荐系统之神经协同过滤 原理 基于邻域的协同过滤算法 基于邻域的协同过滤算法又包括…

多源视频融合平台VMS/smarteye,免费的GB28181 server, 免费的RTMP推流server,RTSP server

海康、大华、宇视等网络摄像机IPcamera及DVR/NVR等多路设备走国标28181接入视频混合融合平台smarteye 第三方国标摄像头走GB28181接入视频融合平台VMS/smarteye&#xff0c; 平台已为设备预分配了SIP帐号&#xff0c;这样免去了找平台人员索要接入SIP帐号的麻烦&#xff0c;可…

粗读[JACS]:多种铂单原子物种的可逆转化和分布测定

摘要&#xff1a;在单原子催化剂&#xff08;SAC&#xff09;中&#xff0c;支撑锚定位点的复杂性创造了具有不同配位环境的大量单原子物种。迄今为止&#xff0c;给定 SAC 中这些不同单原子物种的数量分布仍然难以捉摸。最近&#xff0c;CeO2负载的金属SAC通过多种合成策略调节…

如何在 Linux 上使用 dmesg 命令

文章目录 1. Overview2.ring buffer怎样工作&#xff1f;3.dmesg命令4.移除sudo需求5. 强制彩色输出6.使用人性化的时间戳7.使用dmesg的人性化可读时间戳8.观察实时event9.检索最后10条消息10.搜索特定术语11.使用Log Levels12.使用Facility Categories13.Combining Facility a…

如何使用ETLCloud拉通金蝶云

一、ETLCloud集成组件 ETLCloud采用了一种创新的基于平台底座的理念。它通过将组件和平台进行分离&#xff0c;用户可以在平台上自行下载和安装所需的组件&#xff0c;而无需升级整个底座版本。这样用户就可以通过不断升级组件来增强数据集成平台的处理能力。同时&#xff0c;…

把 Windows 10/11 系统安装到内存上,会发生什么?速度快到爆炸!

把 Windows 10/11 系统安装到内存上&#xff0c;会发生什么&#xff1f;速度快到爆炸&#xff01; Primo Ramdisk 下载&#xff1a;https://www.freedidi.com/11637.html Primo 内存盘 强大的磁盘模拟器可创建超快 RAM 磁盘 1.超快访问、超低时延 RAM 磁盘是根据系统内存创建的…

【论文复现】——一种新的鲁棒三维点云平面拟合方法

目录 一、算法原理1、论文概述2、参考文献二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的GPT爬虫。 一、算法原理 1、论文概述 针对三维点云中的异常值和粗差点对平面拟合精度产生的影响,文章提出一…

LeetCode 刷题 [C++] 第240题.搜索二维矩阵 II

题目描述 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 每列的元素从上到下升序排列。 题目分析 通过分析矩阵的特点发现&#xff0c;其左下角和右上角可以看作一个“二叉搜索树的根节…

安卓平板主板_安卓平板电脑主板MTK联发科|高通|紫光展锐方案

安卓平板电脑主板选择了MTK联发科方案&#xff0c;并且可以选配高通或者紫光展锐平台方案&#xff0c;为用户提供更强劲的性能和定制化的服务。主板搭载了联发科MT6771处理器&#xff0c;采用12nm制程工艺&#xff0c;拥有八核Cortex-A73Coretex-A53架构&#xff0c;主频为2.0G…

人脸识别技术不适用于哪些应用场景?

人脸识别技术在很多应用场景中都具有广泛的应用&#xff0c;但也存在一些场景下不适用的情况&#xff0c;包括&#xff1a; 1. 隐私保护&#xff1a;人脸识别技术涉及对个人敏感信息的收集和处理&#xff0c;可能对隐私构成潜在威胁。在一些需要高度保护个人隐私的场景&#x…