Netty学习——基础篇3(AIO编程)

1 概述

        NIO 2.0 引入了新的异步通道概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取操作结果。

        1、通过java.util.concurrent.Future类来表示异步操作的结果;

        2、在执行异步操作的时候传入一个java.nio.channels

        CompletionHandler接口的实现类作为操作完成的回调。

        NIO2.0 的异步套接字通道是真正的异步非阻塞的时间驱动I/O(AIO)。它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。

2 AIO创建TimeServer

        TimeServerAIO.java 代码如下:

public class TimeServerAIO {public static void main(String[] args) {int port = 8080;AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port);new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start();}
}

        AsyncTimeServerHandler.java 代码如下:

public class AsyncTimeServerHandler implements Runnable{private int port;CountDownLatch latch;AsynchronousServerSocketChannel serverSocketChannel;public AsyncTimeServerHandler(int port){this.port = port;try {serverSocketChannel =  AsynchronousServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port));System.out.println("TimeServer 启动,端口号是:"+port);}catch (IOException e){e.printStackTrace();}}@Overridepublic  void run(){latch = new CountDownLatch(1);doAccept();try {latch.await();}catch (InterruptedException e){e.printStackTrace();}}public void doAccept(){serverSocketChannel.accept(this,new AcceptCompletionHandler());}
}

        重点对AsyncTimeServerHandler.java分析。在构造方法中,首先创建一个异步的服务端通道AsynchronousServerSocketChannel,然后调用它的bind方法绑定监听端口。如果端口合法且没被占用,则绑定成功。

        在线程run方法中,对CountDownLatch对象进行初始化,它的作用是在完成一组正在执行的操作之后,允许当前的线程一直阻塞。在本例中,让线程在此阻塞,防止服务端执行完退出。在实际项目应用中,不需要启动独立的线程来处理AsynchronousServerSocketChannel。这里仅仅是个演示。

        doAccept方法用于接受客户端的连接,由于是异步操作,可以传递一个CompletionHandler<AsynchronousServerSocketChannel,? super A>类型的handler实例接收accept操作成功的通知消息。在本例中,通过AcceptCompletionHandler实例作为handle来接受通知消息。

        AcceptCompletionHandler.java 代码如下

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {@Overridepublic void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {attachment.serverSocketChannel.accept(attachment,this);ByteBuffer buffer = ByteBuffer.allocate(1024);result.read(buffer,buffer,new ReadCompletionHandler(result));}@Overridepublic void failed(Throwable exc, AsyncTimeServerHandler attachment) {exc.printStackTrace();attachment.latch.countDown();}
}

        CompletionHandler.java有两个方法分别如下:

public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) 

        从attachment获取成员变量serverSocketChannel,然后继续调用它的accept方法。既然已经接收客户端成功了,为什么还要再次调用accept方法?原因如下:调AsynchronousSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。因为一个AsynchronousSocketChannel可以接收成千上万个客户端,所有需要继续调用accept方法,接收其他的客户端连接,最终形成一个循环。每当接收一个客户端连接成功之后,再异步接收新的客户端连接。

        链路建立成功之后,服务端需要接收客户端的请求消息,这次创建新的ByteBuffer,预分配1MB的缓冲区,再调用read方法进行异步读操作。

ReadCompletionHandler.java 代码如下
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel channel;public ReadCompletionHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] body = new byte[attachment.remaining()];attachment.get(body);try {String req = new String(body,"UTF-8");System.out.println("TimeServer 接收到:" + body);String currentTime = "query".equalsIgnoreCase(req) ?new Date(System.currentTimeMillis()).toString() : "bad";doWrite(currentTime);}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {this.channel.close();}catch (IOException e){e.printStackTrace();}}private void doWrite(String currentTime){if(currentTime != null && currentTime.trim().length() > 0){byte[] bytes = currentTime.getBytes(StandardCharsets.UTF_8);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {//如果没有发送完成,继续发送if(buffer.hasRemaining()){channel.write(buffer,buffer,this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {channel.close();}catch (IOException e){e.printStackTrace();}}});}}
}

        首先看构造方法。将AsynchronousSocketChannel传递到ReadCompletionHandler中,当做成员变量来使用,主要用于读取半包消息和发送应答。

        completed()方法是读取到消息后的处理。首先对attachment进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数据,然后通过new String方法创建请求消息,对请求消息进行判断,如果是“query”则获取当前系统服务器时间,调用doWrite方法发送给客户端。

3 AIO创建TimeClient

        TimeServerClientAIO.java 代码
public class TimeServerClientAIO {public static void main(String[] args) {int port = 8080;new Thread(new AsyncTimeClientHandler("127.0.0.1"),port);}
}

        通过一个独立的I/O线程创建异步时间服务器客户端Handler。在实际项目中,不需要独立的创建线程对象异步连接对象,因为底层都是通过JDK的系统回调实现的。

        AsyncTimeClientHandler.java 代码

import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;/*** @author liujie* @create 2024-02-27 12:26*/
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler> ,Runnable{private AsynchronousSocketChannel client;private String host;private int port;private CountDownLatch latch;public AsyncTimeClientHandler(String host, int port) {this.host = host;this.port = port;try {client = AsynchronousSocketChannel.open();}catch (IOException e){e.printStackTrace();}}@Overridepublic void run() {latch = new CountDownLatch(1);client.connect(new InetSocketAddress(host,port),this,this);try {latch.await();}catch (InterruptedException e){e.printStackTrace();}try {client.close();}catch (IOException e1){e1.printStackTrace();}}@Overridepublic void completed(Void result, AsyncTimeClientHandler attachment) {byte[] req = "query".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();client.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {if(buffer.hasRemaining()){client.write(buffer,buffer,this);}else{ByteBuffer readBuffer = ByteBuffer.allocate(1024);client.read(readBuffer,readBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String body;try {body = new String(bytes,"UTF-8");System.out.println("现在时间是:" + body);latch.countDown();}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}@Overridepublic void failed(Throwable exc, AsyncTimeClientHandler attachment) {exc.printStackTrace();try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}
}
代码分析
1、构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。
2、run()方法:创建CountDownLatch进行等地啊,防止异步操作没有执行完成线程就退出。通过connect方法发起异步操作,它有两个参数,分别是:

        1、A attachment:AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;

        2、CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。在本例中,这两个参数都是用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。

3、completed()方法。异步连接成功之后的方法回调。创建请求消息体,对其进行编码,然后复制到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写。如果发送缓冲区中仍有尚未发送的字节,将继续异步发送,直到发送完毕,则执行异步读取操作。client.read()方法是客户端异步读取时间服务器应答消息的处理逻辑。首先调用read方法异步读取服务器的响应信息。由于read操作是异步的,所以通过内部匿名类实现CompletionHandler接口,当读取完成被JDK回调时,构造应答消息。

4、failed()方法。当读取发生异常时,关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。

4 I/O对比

        4.1 异步非阻塞I/O

        很多人喜欢把JDK1.4提供的NIO框架称为异步非阻塞I/O,但是,如果严格按照UNIX网络编程模型和JDK的实现进行区分,实际上它只能被称为非阻塞I/O,不能叫异步非阻塞I/O。在早期的JDK 1.4和JDK 1.5中,JDK的Selector基于select/poll模型实现,它是基于I/O复用技术的非阻塞I/O,不是异步I/O。由JDK1.7提供了NIO 2.0新增了异步套接字通道,它是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成之后会回调相关的方法,异步I/O也称为AIO。

        4.2 多路复用器Selector

        在前面的章节介绍过Java NIO的实现关键是多路复用I/O技术,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行I/O操作。

        4.3 伪异步I/O

        在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离I/O线程和业务线程间的直接访问,这样业务线程就不会被I/O线程阻塞。而对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不在直接访问I/O线程或者进行I/O读写,这样也就不会被同步阻塞。

        不同的I/O模型由于线程模型、API等差别很大,所以用法的差异也非常大。下图对它们进行功能对比:

           

        选择什么样的I/O模型或者NIO框架,要基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,周边对接的网元不多,服务器负载也不重,那就完全没必要选择NIO做服务器;如果是相反情况,就要考虑选择合适的NIO框架进行开发。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/509091.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年阿里云最便宜云服务器价格:61元、165元、99元、199元

2024阿里云服务器优惠活动政策整理&#xff0c;阿里云99计划ECS云服务器2核2G3M带宽99元一年、2核4G5M优惠价格199元一年&#xff0c;轻量应用服务器2核2G3M服务器61元一年、2核4G4M带宽165元1年&#xff0c;云服务器4核16G10M带宽26元1个月、149元半年&#xff0c;云服务器8核…

【Micropython基础】TCP客户端与服务器

文章目录 前言一、连接Wifi1.1 创建STA接口1.2 激活wifi接口1.3 连接WIFI1.4 判断WIFI是否连接1.5 连接WIFI总体代码 二、创建TCP 客户端2.1 创建套接字2.2 设置TCP服务器的ip地址和端口2.3 连接TCP服务器2.3 发送数据2.4 接收数据2.5 断开连接2.6 示例代码 三、TCP服务器的创建…

深入了解手机App开发:从构思到上线的全过程

引言 在当今数字化时代&#xff0c;手机App已经成为人们日常生活中不可或缺的一部分&#xff0c;深刻地改变了我们的交流、工作、娱乐和购物方式。随着智能手机的普及&#xff0c;手机App的重要性愈发凸显&#xff0c;它们不仅为用户提供了便捷的工具和娱乐方式&#xff0c;还…

Qt/C++音视频开发67-保存裸流加入sps/pps信息/支持264/265裸流/转码保存/拉流推流

一、前言 音视频组件除了支持保存MP4文件外&#xff0c;同时还支持保存裸流即264/265文件&#xff0c;以及解码后最原始的yuv文件。在实际使用过程中&#xff0c;会发现部分视频文件保存的裸流文件&#xff0c;并不能直接用播放器播放&#xff0c;查阅资料得知原来是缺少sps/p…

Unity 脚本-生命周期常用函数

在Unity中&#xff0c;万物皆是由组件构成的。 右键创建C&#xff03;脚本&#xff0c;拖动脚本到某物体的组件列表。 生命周期相关函数 using System.Collections; using System.Collections.Generic; using UnityEngine;// 必须要继承 MonoBehaviour 才是一个组件 // 类名…

【Redis】Redis 实现分布式Session

Cookie 保存在客户端浏览器中&#xff0c;而 Session 保存在服务器上。客户端浏览器访问服务器的时候&#xff0c;服务器把客户端信息以某种形式记录在服务器上&#xff0c;这就是 Session。客户端浏览器再次访问时只需要从该 Session 中查找该客户的状态就可以了。 在实际工作…

【全志D1-H 哪吒开发板】Debian系统安装调教和点灯指南

全志D1-H开发板【哪吒】使用Deabian系统入门 特别说明&#xff1a; 因为涉及到操作较多&#xff0c;博文可能会导致格式丢失 其中内容&#xff0c;会根据后续使用做优化调整 目录&#xff1a; 参考资料固件烧录启动调教点灯问题 〇、参考资料 官方资料 开发板-D1开发板【…

rtt的io设备框架面向对象学习-io设备管理层

目录 1.设备基类2.rtt基类2.1 rtt基类定义2.2 对象容器定义2.3 rtt基类构造函数 3.io设备管理接口4.总结 这层我的理解就是rtt基类和设备基类所在&#xff0c;所以抽离出来好点&#xff0c;不然每个设备类都要重复它。 诺&#xff0c;rtt基类和设备基类如下对象图&#xff0c;这…

【嵌入式——QT】QTreeWidget

QTreeWidget类是创建和管理目录树结构的类&#xff0c;QTreeWidget每一个节点都是一个QTreeWidgetItem对象&#xff0c;添加一个节点前需先创建。QTreeWidget类是一个便利类&#xff0c;它提供了一个标准的树widget&#xff0c;具有经典的基于item的界面&#xff0c;类似于Qt 3…

如何理解工程管理,与项目管理的区别与联系?

如何理解工程管理&#xff0c;与项目管理的区别与联系&#xff1f; 首先&#xff0c;项目管理并不是工程管理的子集&#xff0c;大家可能混淆了另一个“工程项目管理”的概念。 工程项目管理模板一键安装&#xff0c;进入链接即可查看和使用&#xff1a;https://www.jiandaoyu…

Running job: job_1709516801756_0003

** yarn运行卡在Running job: job_1709516801756_0003问题解决&#xff1a; ** 在运行wordcount时出现错误&#xff0c;一直卡住 运行命令&#xff1a;hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount /input /output 出现错误&#xff1a…

“智农”-农业物联网可视化

大棚可视化|设施农业可视化|农业元宇宙|农业数字孪生|大棚物联网|大棚数字孪生|农业一体化管控平台|智慧农业可视化|智农|农业物联网可视化|农业物联网数字孪生|智慧农业|大棚三维可视化|智慧大棚可视化|智慧大棚|农业智慧园区|数字农业|数字大棚|农业大脑|智慧牧业数字孪生|智…