1 概述
NIO 2.0 引入了新的异步通道概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取操作结果。
1、通过java.util.concurrent.Future类来表示异步操作的结果;
2、在执行异步操作的时候传入一个java.nio.channels
CompletionHandler接口的实现类作为操作完成的回调。
NIO2.0 的异步套接字通道是真正的异步非阻塞的时间驱动I/O(AIO)。它不需要通过多路复用器对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模型。
2 AIO创建TimeServer
TimeServerAIO.java 代码如下:
public class TimeServerAIO {public static void main(String[] args) {int port = 8080;AsyncTimeServerHandler timeServerHandler = new AsyncTimeServerHandler(port);new Thread(timeServerHandler,"AIO-AsyncTimeServerHandler-001").start();}
}
AsyncTimeServerHandler.java 代码如下:
public class AsyncTimeServerHandler implements Runnable{private int port;CountDownLatch latch;AsynchronousServerSocketChannel serverSocketChannel;public AsyncTimeServerHandler(int port){this.port = port;try {serverSocketChannel = AsynchronousServerSocketChannel.open();serverSocketChannel.bind(new InetSocketAddress(port));System.out.println("TimeServer 启动,端口号是:"+port);}catch (IOException e){e.printStackTrace();}}@Overridepublic void run(){latch = new CountDownLatch(1);doAccept();try {latch.await();}catch (InterruptedException e){e.printStackTrace();}}public void doAccept(){serverSocketChannel.accept(this,new AcceptCompletionHandler());}
}
重点对AsyncTimeServerHandler.java分析。在构造方法中,首先创建一个异步的服务端通道AsynchronousServerSocketChannel,然后调用它的bind方法绑定监听端口。如果端口合法且没被占用,则绑定成功。
在线程run方法中,对CountDownLatch对象进行初始化,它的作用是在完成一组正在执行的操作之后,允许当前的线程一直阻塞。在本例中,让线程在此阻塞,防止服务端执行完退出。在实际项目应用中,不需要启动独立的线程来处理AsynchronousServerSocketChannel。这里仅仅是个演示。
doAccept方法用于接受客户端的连接,由于是异步操作,可以传递一个CompletionHandler<AsynchronousServerSocketChannel,? super A>类型的handler实例接收accept操作成功的通知消息。在本例中,通过AcceptCompletionHandler实例作为handle来接受通知消息。
AcceptCompletionHandler.java 代码如下
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {@Overridepublic void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {attachment.serverSocketChannel.accept(attachment,this);ByteBuffer buffer = ByteBuffer.allocate(1024);result.read(buffer,buffer,new ReadCompletionHandler(result));}@Overridepublic void failed(Throwable exc, AsyncTimeServerHandler attachment) {exc.printStackTrace();attachment.latch.countDown();}
}
CompletionHandler.java有两个方法分别如下:
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment)
从attachment获取成员变量serverSocketChannel,然后继续调用它的accept方法。既然已经接收客户端成功了,为什么还要再次调用accept方法?原因如下:调AsynchronousSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。因为一个AsynchronousSocketChannel可以接收成千上万个客户端,所有需要继续调用accept方法,接收其他的客户端连接,最终形成一个循环。每当接收一个客户端连接成功之后,再异步接收新的客户端连接。
链路建立成功之后,服务端需要接收客户端的请求消息,这次创建新的ByteBuffer,预分配1MB的缓冲区,再调用read方法进行异步读操作。
ReadCompletionHandler.java 代码如下
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {private AsynchronousSocketChannel channel;public ReadCompletionHandler(AsynchronousSocketChannel channel) {this.channel = channel;}@Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();byte[] body = new byte[attachment.remaining()];attachment.get(body);try {String req = new String(body,"UTF-8");System.out.println("TimeServer 接收到:" + body);String currentTime = "query".equalsIgnoreCase(req) ?new Date(System.currentTimeMillis()).toString() : "bad";doWrite(currentTime);}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {this.channel.close();}catch (IOException e){e.printStackTrace();}}private void doWrite(String currentTime){if(currentTime != null && currentTime.trim().length() > 0){byte[] bytes = currentTime.getBytes(StandardCharsets.UTF_8);ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {//如果没有发送完成,继续发送if(buffer.hasRemaining()){channel.write(buffer,buffer,this);}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {channel.close();}catch (IOException e){e.printStackTrace();}}});}}
}
首先看构造方法。将AsynchronousSocketChannel传递到ReadCompletionHandler中,当做成员变量来使用,主要用于读取半包消息和发送应答。
completed()方法是读取到消息后的处理。首先对attachment进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数据,然后通过new String方法创建请求消息,对请求消息进行判断,如果是“query”则获取当前系统服务器时间,调用doWrite方法发送给客户端。
3 AIO创建TimeClient
TimeServerClientAIO.java 代码
public class TimeServerClientAIO {public static void main(String[] args) {int port = 8080;new Thread(new AsyncTimeClientHandler("127.0.0.1"),port);}
}
通过一个独立的I/O线程创建异步时间服务器客户端Handler。在实际项目中,不需要独立的创建线程对象异步连接对象,因为底层都是通过JDK的系统回调实现的。
AsyncTimeClientHandler.java 代码
import java.io.BufferedReader;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;/*** @author liujie* @create 2024-02-27 12:26*/
public class AsyncTimeClientHandler implements CompletionHandler<Void,AsyncTimeClientHandler> ,Runnable{private AsynchronousSocketChannel client;private String host;private int port;private CountDownLatch latch;public AsyncTimeClientHandler(String host, int port) {this.host = host;this.port = port;try {client = AsynchronousSocketChannel.open();}catch (IOException e){e.printStackTrace();}}@Overridepublic void run() {latch = new CountDownLatch(1);client.connect(new InetSocketAddress(host,port),this,this);try {latch.await();}catch (InterruptedException e){e.printStackTrace();}try {client.close();}catch (IOException e1){e1.printStackTrace();}}@Overridepublic void completed(Void result, AsyncTimeClientHandler attachment) {byte[] req = "query".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();client.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {if(buffer.hasRemaining()){client.write(buffer,buffer,this);}else{ByteBuffer readBuffer = ByteBuffer.allocate(1024);client.read(readBuffer,readBuffer,new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer result, ByteBuffer buffer) {buffer.flip();byte[] bytes = new byte[buffer.remaining()];buffer.get(bytes);String body;try {body = new String(bytes,"UTF-8");System.out.println("现在时间是:" + body);latch.countDown();}catch (UnsupportedEncodingException e){e.printStackTrace();}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}});}@Overridepublic void failed(Throwable exc, AsyncTimeClientHandler attachment) {exc.printStackTrace();try {client.close();latch.countDown();}catch (IOException e){e.printStackTrace();}}
}
代码分析 1、构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。 2、run()方法:创建CountDownLatch进行等地啊,防止异步操作没有执行完成线程就退出。通过connect方法发起异步操作,它有两个参数,分别是:
1、A attachment:AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;
2、CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。在本例中,这两个参数都是用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。
3、completed()方法。异步连接成功之后的方法回调。创建请求消息体,对其进行编码,然后复制到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写。如果发送缓冲区中仍有尚未发送的字节,将继续异步发送,直到发送完毕,则执行异步读取操作。client.read()方法是客户端异步读取时间服务器应答消息的处理逻辑。首先调用read方法异步读取服务器的响应信息。由于read操作是异步的,所以通过内部匿名类实现CompletionHandler接口,当读取完成被JDK回调时,构造应答消息。
4、failed()方法。当读取发生异常时,关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。
4 I/O对比
4.1 异步非阻塞I/O
很多人喜欢把JDK1.4提供的NIO框架称为异步非阻塞I/O,但是,如果严格按照UNIX网络编程模型和JDK的实现进行区分,实际上它只能被称为非阻塞I/O,不能叫异步非阻塞I/O。在早期的JDK 1.4和JDK 1.5中,JDK的Selector基于select/poll模型实现,它是基于I/O复用技术的非阻塞I/O,不是异步I/O。由JDK1.7提供了NIO 2.0新增了异步套接字通道,它是真正的异步I/O,在异步I/O操作的时候可以传递信号变量,当操作完成之后会回调相关的方法,异步I/O也称为AIO。
4.2 多路复用器Selector
在前面的章节介绍过Java NIO的实现关键是多路复用I/O技术,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行I/O操作。
4.3 伪异步I/O
在通信线程和业务线程之间做个缓冲区,这个缓冲区用于隔离I/O线程和业务线程间的直接访问,这样业务线程就不会被I/O线程阻塞。而对于后端的业务侧来说,将消息或者Task放到线程池后就返回了,它不在直接访问I/O线程或者进行I/O读写,这样也就不会被同步阻塞。
不同的I/O模型由于线程模型、API等差别很大,所以用法的差异也非常大。下图对它们进行功能对比:
选择什么样的I/O模型或者NIO框架,要基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,周边对接的网元不多,服务器负载也不重,那就完全没必要选择NIO做服务器;如果是相反情况,就要考虑选择合适的NIO框架进行开发。