JAVA的NIO和BIO底层原理分析

文章目录

  • 一、操作系统底层IO原理
    • 1. 简介
    • 2. 操作系统进行IO的流程
  • 二、BIO底层原理
    • 1. 什么是Socket
    • 2. JDK原生编程的BIO
  • 三、Java原生编程的NIO
    • 1. 简介
    • 2. NIO和BIO的主要区别
    • 3. Reactor模式
    • 4. NIO的三大核心组件
    • 5. NIO核心源码分析

一、操作系统底层IO原理

1. 简介

IO,即Input/Output,指的是输入和输出。在计算机科学中,IO描述的是数据在内部存储器和外部存储器或其他周边设备之间的流动过程,既包括数据从外部复制到内存(输入),也包括数据从内存复制到外部(输出)。IO是计算机与外界交互的过程,涉及到的对象可以是人或其他设备,如文件、管道、网络、命令行、信号等,更广义地讲,I/O指代任何操作系统理解为“文件”的事务。此外,IO也是操作系统中的一个核心概念,在各种系统中都有重要地位,例如在本机、传统的单体应用、分布式系统中。IO操作可以有多种方式,如DIO(Direct I/O)、AIO(Asynchronous I/O,异步I/O)、Memory-Mapped I/O(内存映射I/O)等,不同的I/O方式有不同的实现方式和性能,适用于不同的应用场景

2. 操作系统进行IO的流程

首先我们需要了机计算机网络的协议栈,这里有两种分别是OSI参考模型和TCP/IP五层模型,在实际中通常使用到的只有TCP/IP五层模型,因为OSI参考模型实现过于复杂。

在这里插入图片描述
现在我们大致看一下数据是如何从一个计算机传递到另一个计算机的,假如张三向李四发送了一条你好的消息,大致会经过一下过程:

  1. 首先应用程序会进行编码处理将字符消息转化为二进制流,然后交给传输层(此时产生的数据包类型为报文)
  2. TCP 根据应用的指示,负责建立连接、发送数据以及断开连接。TCP 提供将应用层发来的数据顺利发送至对端的可靠传输。为了实现这一功能,需要将应用层数据封装为报文段 (segment)并附加一个 TCP 首部然后交给下面的 IP 层。
  3. IP 将 TCP 传过来的 TCP 首部和 TCP 数据合起来当做自己的数据,并在 TCP 首部的前端 加上自己的 IP 首部生成 IP 数据报(datagram)然后交给下面的数据链路层。
  4. 从 IP 传过来的 IP 包对于数据链路层来说就是数据。给这些数据附加上链路层首部封装为链路层帧(frame),生成的链路层帧(frame)将通过物理层传输给接收端。
  5. 然后到了李四的计算机,就会逆向的进行上面的过程,将消息最后传输给应用程序,这样李四就收到了张三的消息。

上面就是整个计算机网络基于TCP通信的大致过程,那么现在的问题是操作系统内部是如何进行IO的

我们知道IO无非就是两个核心点,读数据和写数据,我们的应用程序是工作在操作系统的用户态时,当应用程序要执行IO时,用户态需要通过系统调用从用户态切换到核心态。如果应用程序现在在执行读操作,那么操作系统首先会将接收到的网络IO数据存储在内核缓冲,然后将内核缓存准备好的数据拷贝到用户缓存区,然后应用程序就可以处理接收到的数据了。如果应用程序正在执行写操作,那么操作系统需要将应用程序准备好的数据从用户缓存拷贝到内核缓存,接着发送出去,下图就展示了大致的细节。

可以发现上面的过程经过了多次的操作系统用户态到内核态的切换,这是很耗时的,可以使用0拷贝等相关技术进行优化,这里就不详细分析了。

在这里插入图片描述
下面我们更加深入:

  • 读数据
    ①首先在网络的网卡上或本地存储设备中准备数据,然后调用read()函数。
    ②调用read()函数后,由内核将网络/本地数据读取到内核缓冲区中。
    ③读取完成后向CPU发送一个中断信号,通知CPU对数据进行后续处理。
    ④CPU将内核中的数据写入到对应的程序缓冲区或网络Socket接收缓冲区中。
    ⑤数据全部写入到缓冲区后,应用程序开始对数据开始实际的处理。

程序中试图利用IO机制读写数据时,仅仅只是调用了内核提供的接口函数而已,本质上真正的IO操作还是由内核自己去完成的。Linux 系统为了提高 IO 效率,会在用户空间和内核空间都加入缓冲区(缓冲区可以减少频繁的系统 IO 调 用。系统调用需要保存之前的进程数据和状态等信息,而结束调用之后回来还需要恢复之前的信息,为 了减少这种损耗时间、也损耗性能的系统调用,于是出现了缓冲区)

  • 写数据
    ①应用程序准备要写入的数据,可能是从用户输入、其他应用程序输出或者本地文件等获取的数据。
    ②当应用程序调用write()函数时,数据被写入到应用程序的内核缓冲区。
    ③CPU处理写操作,内核在写入数据到内核缓冲区后,向CPU发送一个中断信号,通知CPU有数据需要写入到指定的目的地(例如硬盘或网络)。
    ④发送完成通知,当数据全部写入到目标设备或网络中时,系统可能会向应用程序发送一个写入完成的通知。

二、BIO底层原理

1. 什么是Socket

Socket 是应用层与 TCP/IP 协议族通信的中间软件抽象层,它是一组接口,一般由操作系统提供。在设计模式中,Socket 其实就是一个门面模式,它把复杂的 TCP/IP 协议处理和通信缓存管理等等都隐藏在 Socket 接口后面,对用户来说,使用一组简单的接口就能进行网络应用编程,让 Socket 去组织数据,以符合指定的协议。主机 A 的应用程序要能和主机 B 的 应用程序通信,必须通过 Socket 建立连接。客户端连接上一个服务端,就会在客户端中产生一个 socket 接口实例,服务端每接受 一个客户端连接,就会产生一个 socket 接口实例和客户端的 socket 进行通信,有多个客户端连接自然就有多个 socket 接口实例。

在这里插入图片描述

2. JDK原生编程的BIO

BIO也就是阻塞式IO。在 BIO 中类 ServerSocket 负责绑定IP地址,启动监听端口,等待客户连接;客户端 Socket 类的实例发起连接操作,ServerSocket 接受连接后产生一个新的服务端 socket 实例负责和客户端 socket 实例通过输入和输出流进行通信。

在这里插入图片描述
BIO阻塞的含义体现在两个方面:

  1. 若一个服务器启动就绪,那么主线程就一直在等待着客户端的连接,这个等待过程中主线程就一直在阻塞。
  2. 在连接建立之后,在读取到 socket 信息之前,客户端线程也是一直在等待,一直处于阻塞的状态下的。

我们看一个java实现的BIO通信模式的案例的代码,首先是服务端:

 public static void main(String[] args) throws IOException {//服务端启动必备ServerSocket serverSocket = new ServerSocket();//表示服务端在哪个端口上监听serverSocket.bind(new InetSocketAddress(10001));System.out.println("Start Server ....");try{while(true){new Thread(new ServerTask(serverSocket.accept())).start();}}finally {serverSocket.close();}}//每个和客户端的通信都会打包成一个任务,交个一个线程来执行private static class ServerTask implements Runnable{private Socket socket = null;public ServerTask(Socket socket){this.socket = socket;}@Overridepublic void run() {//实例化与客户端通信的输入输出流try(ObjectInputStream inputStream =new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(socket.getOutputStream())){//接收客户端的输出,也就是服务器的输入String userName = inputStream.readUTF();System.out.println("Accept client message:"+userName);//服务器的输出,也就是客户端的输入outputStream.writeUTF("Hello,"+userName);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}

首先定义了一个ServerSocket方法并调用accept方法去监听10001端口,当然上面代码是创建了一个新的线程来专门监听10001端口,我们看看accept方法底层到底在做什么?

 public Socket accept() throws IOException {if (isClosed())throw new SocketException("Socket is closed");if (!isBound())throw new SocketException("Socket is not bound yet");Socket s = new Socket((SocketImpl) null);implAccept(s);return s;}

首先它会调用isClosed()方法判断当前的ServerSocket是否已经关闭了,ServerSocket声明了一个closed变量,来维护ServerSocekt的状态。

 private boolean closed = false;
public boolean isClosed() {synchronized(closeLock) {return closed;}}

下面代码用于判断当前的SocketServer是否已经与端口绑定了,ServerSocekt底层同样是有一个bound成员变量来维护当前ServerSocket的绑定状态。

if (!isBound())throw new SocketException("Socket is not bound yet");

上面代码中我们调用了bind方法来将ServerSocket与指定的端口进行绑定,下面我们看看绑定的时候底层在做什么?

public void bind(SocketAddress endpoint, int backlog) throws IOException {if (isClosed())throw new SocketException("Socket is closed");if (!oldImpl && isBound())throw new SocketException("Already bound");if (endpoint == null)endpoint = new InetSocketAddress(0);if (!(endpoint instanceof InetSocketAddress))throw new IllegalArgumentException("Unsupported address type");InetSocketAddress epoint = (InetSocketAddress) endpoint;if (epoint.isUnresolved())throw new SocketException("Unresolved address");if (backlog < 1)backlog = 50;try {SecurityManager security = System.getSecurityManager();if (security != null)security.checkListen(epoint.getPort());getImpl().bind(epoint.getAddress(), epoint.getPort());getImpl().listen(backlog);bound = true;} catch(SecurityException e) {bound = false;throw e;} catch(IOException e) {bound = false;throw e;}}

其实核心的就下面两句代码,一个是绑定动作,一个是监听动作,监听动作底层调用了socketListen这一个native方法。

getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);

回到accept方法,接着它创建了一个Socket对象

Socket s = new Socket((SocketImpl) null);

然后调用了implAccept(s)方法,参数是上面我们创建的Socekt,我们进入该方法。

protected final void implAccept(Socket s) throws IOException {SocketImpl si = null;try {if (s.impl == null)//用 setImpl() 方法,该方法用于设置 Socket 对象的底层实现。s.setImpl();else {//调用 reset() 方法,该方法用于重置 Socket 对象的底层实现。s.impl.reset();}si = s.impl;s.impl = null;si.address = new InetAddress();//指定文件描述符si.fd = new FileDescriptor();//这个accept底层也是调用的socketListen这个native方法getImpl().accept(si);SecurityManager security = System.getSecurityManager();if (security != null) {security.checkAccept(si.getInetAddress().getHostAddress(),si.getPort());}} catch (IOException e) {if (si != null)si.reset();s.impl = si;throw e;} catch (SecurityException e) {if (si != null)si.reset();s.impl = si;throw e;}s.impl = si;s.postAccept();}

socketListen 方法通常是在底层操作系统或网络库中实现的,用于启动套接字的监听过程。这个方法在大多数情况下是阻塞的,因为它需要等待客户端的连接请求到达。当调用 socketListen 方法后,套接字会进入监听状态,等待客户端连接请求。在这个过程中,如果没有客户端连接请求到达,socketListen 方法会一直阻塞,直到有新的连接请求到达或者发生超时。

上面方法我们会阻塞在 getImpl().accept(si);然后一旦客户端有连接来,就会立即返回accept方法,并将新创建的Socket返回,重新回顾整个过程,服务端程序一直阻塞等待,如果客户端有连接来了就会创建一个新的Socket用于与该连接通信,上面有个疑问的地方就是bind和accept方法好像都有一个socketListen那么意味bind方法执行后是否就可以处理客户端连接了?我的个人理解是前者主要是用来建立TCP连接的,参考[这篇博客]。

上面就是传统的BIO的通信模型,采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理(上面的案例我没有使用多线程处理,而是服务端就一个线程),处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答模型,同时数据的读取写入也必须阻塞在一个线程内等待其完成。

在这里插入图片描述
该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程 个数和客户端并发访问数呈 1:1 的正比关系,Java 中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死掉了。

为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程,实现 1 个或 多个线程处理 N 个客户端的模型(但是底层还是使用的同步阻塞 I/O),通常被称为“伪异 步 I/O 模型”。我们知道,如果使用 CachedThreadPool 线程池(不限制线程数量,如果不清楚请参考 文首提供的文章),其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的客户 端:线程数模型,而使用 FixedThreadPool 我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了 N:M 的伪异步 I/O 模型。

在这里插入图片描述
但是,正因为限制了线程数量,如果发生读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。

三、Java原生编程的NIO

1. 简介

NIO 库是在 JDK 1.4 中引入的。NIO 弥补了原来的 BIO 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。NIO 被称为 no-blocking io 或者 new io 都说得通。

2. NIO和BIO的主要区别

  • 面向流和面向缓冲

Java NIO 和 BIO 之间第一个最大的区别是,BIO 是面向流的,NIO 是面向缓冲区的。 Java BIO 面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地 方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO 的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

  • 阻塞和非阻塞IO

Java IO 的各种流是阻塞的。这意味着,当一个线程调用 read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。Java NIO 的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞 IO 的空闲时间用于在其它通道上执行 IO 操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

3. Reactor模式

Reator模式可以看[这篇博客]。

4. NIO的三大核心组件

  • Selector

Selector 的英文含义是“选择器”,也可以称为为“轮询代理器”、“事件订阅器”、“channel 容器管理机”都行。
Java NIO 的选择器允许一个单独的线程来监视多个输入通道,你可以注册多个通道使用一个选择器(Selectors),然后使用一个单独的线程来操作这个选择器,进而“选择”通道:这些通道里已经有可以处理的输入,或者选择已准备写入的通道。这种选择机制,使得一个单独的线程很容易来管理多个通道。应用程序将向 Selector 对象注册需要它关注的 Channel,以及具体的某一个 Channel 会对哪些 IO 事件感兴趣。Selector 中也会维护一个“已经注册的 Channel”的容器。

  • Channels

通道,被建立的一个应用程序和操作系统交互事件、传递内容的渠道(注意是连接到操作系统)。那么既然是和操作系统进行内容的传递,那么说明应用程序可以通过通道读取数据,也可以通过通道向操作系统写数据,而且可以同时进行读写。
• 所有被 Selector(选择器)注册的通道,只能是继承了SelectableChannel 类的子类。
• ServerSocketChannel:应用服务器程序的监听通道。只有通过这个通道,应用程序才能向操作系统注册支持“多路复用 IO”的端口监听。同时支持 UDP 协议和 TCP 协议。
• ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP端口 到服务器 IP端口的通信连接。 通道中的数据总是要先读到一个 Buffer,或者总是要从一个 Buffer 中写入。

  • Buffer

我们前面说过 JDK NIO 是面向缓冲的。Buffer 就是这个缓冲,用于和 NIO 通道进行交互。 数据是从通道读入缓冲区,从缓冲区写入到通道中的。以写为例,应用程序都是将数据写入缓冲,再通过通道把缓冲的数据发送出去,读也是一样,数据总是先从通道读到缓冲,应用 程序再读缓冲的数据。缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存(其实就是数组)。 这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。

在这里插入图片描述

5. NIO核心源码分析

首先我们给出服务端的实现:

public class NioServer {private static NioServerHandle nioServerHandle;public static void main(String[] args){nioServerHandle = new NioServerHandle(DEFAULT_PORT);new Thread(nioServerHandle,"Server").start();}}public class NioServerHandle implements Runnable{private volatile boolean started;private ServerSocketChannel serverSocketChannel;private Selector selector;/*** 构造方法* @param port 指定要监听的端口号*/public NioServerHandle(int port) {try {/*创建选择器的实例*/selector = Selector.open();/*创建ServerSocketChannel的实例*/serverSocketChannel = ServerSocketChannel.open();/*设置通道为非阻塞模式*/serverSocketChannel.configureBlocking(false);/*绑定端口*/serverSocketChannel.socket().bind(new InetSocketAddress(port));/*注册事件,表示关心客户端连接*/serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);started = true;System.out.println("服务器已启动,端口号:"+port);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while(started){try {/*获取当前有哪些事件*/selector.select(1000);/*获取事件的集合*/Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。*/iterator.remove();handleInput(key);}} catch (IOException e) {e.printStackTrace();}}}/*处理事件的发生*/private void handleInput(SelectionKey key) throws IOException {if(key.isValid()){/*处理新接入的客户端的请求*/if(key.isAcceptable()){/*获取关心当前事件的Channel*/ServerSocketChannel ssc= (ServerSocketChannel) key.channel();/*接受连接*/SocketChannel sc = ssc.accept();System.out.println("==========建立连接=========");sc.configureBlocking(false);/*关注读事件*/sc.register(selector,SelectionKey.OP_READ);}/*处理对端的发送的数据*/if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();/*创建ByteBuffer,开辟一个缓冲区*/ByteBuffer buffer = ByteBuffer.allocate(1024);/*从通道里读取数据,然后写入buffer*/int readBytes = sc.read(buffer);if(readBytes>0){/*将缓冲区当前的limit设置为position,position=0,用于后续对缓冲区的读取操作*/buffer.flip();/*根据缓冲区可读字节数创建字节数组*/byte[] bytes = new byte[buffer.remaining()];/*将缓冲区可读字节数组复制到新建的数组中*/buffer.get(bytes);String message = new String(bytes,"UTF-8");System.out.println("服务器收到消息:"+message);/*处理数据*/String result = Const.response(message);/*发送应答消息*/doWrite(sc,result);}else if(readBytes<0){/*取消特定的注册关系*/key.cancel();/*关闭通道*/sc.close();}}}}/*发送应答消息*/private void doWrite(SocketChannel sc,String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer buffer = ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}public void stop(){started = false;}}

首先NioServerHandle构成方法接受一个参数port,也就是socket要绑定的本地端口,首先它创建了一个选择器实例,Selector就是IO多路复用中的多路复用器,Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
在这里插入图片描述

selector = Selector.open();
public static Selector open() throws IOException {return SelectorProvider.provider().openSelector();
}

这个provider()本质是SelectorProviderl类,是一个抽象类,它定义了创建Selector和Channel实例的方法。不同的操作系统可能有不同的I/O机制和系统调用,因此SelectorProvider的实现类会根据当前平台的特性提供相应的Selector和Channel实例。创建Selector调用了抽象类Selector中的静态方法,open方法。这个方法的返回值是操作系统对应的选择器,这个与你虚拟机所在的系统相关),这里我们就不深纠了。创建好选择器之后就执行下面代码创建了一个ServerSocketChannel对象。

serverSocketChannel = ServerSocketChannel.open();

我们看看这个open()静态方法底层做了一些什么事:

    public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();}

它同样调用了SelectorProviderl的provider方法返回了一个适合本系统的Channel实现。然后下面就开始绑定端口了:

serverSocketChannel.socket().bind(new InetSocketAddress(port));public abstract class ServerSocketChannelextends AbstractSelectableChannelimplements NetworkChannel
{public abstract ServerSocket socket();}

根据上面代码我们知道serverSocketChannel内部是封装了ServerSocket的实现了的,所以通道的本质上就是在Socket的基础上封装了更多的操作。下面就是NIO特别的地方了,它将Channel注册到了Selector中:

serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

register方法有两个参数,第一个是通道要注册的选择器,第二个参数就是选择器所关心的通道操作。这个是SelectionKey中定义的四个事件之一,也就是连接事件。它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。如果 Selector 对通道的多操作类型感兴趣,可以用“位或”操作符来实现,SelectionKey.OP_READ|SelectionKey.OP_WRITE;。

 public static final int OP_READ = 1 << 0;public static final int OP_WRITE = 1 << 2;public static final int OP_CONNECT = 1 << 3;public static final int OP_ACCEPT = 1 << 4;

在这里插入图片描述

注意:一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册 到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set。我们进入SelectionKey类:

   public abstract int interestOps();public abstract int readyOps();

interestOps可以判断 Selector 是否对 Channel 的某种事件感兴趣, readyOps()来获取相关通道已经就绪的操作。然后还有下面两个方法:

 public abstract SelectableChannel channel();abstract Selector selector();public abstract void cancel();

通过上面方法我们可以获取这个 SelectionKey 所关联的 Selector 和 Channel。 如果我们要取消关联关系,SelectionKey 对象的 cancel()方法来取消特定的注册关系。

上面我们服务端的ServerSocketChannel就创建完了,通过上面我们知道上面的核心关键就是创建了Selector并将ServerSoceketChannel关联的SelectionKey注册到了Seletctor中了。下面回到NioServer类,下面就是创建一个新的线程来开启服务端。

 new Thread(nioServerHandle,"Server").start();

NioServerHandle本身就是实现了Runable接口的,所以在上面创建的线程执行run方法的时候,会间接调用到NioServerHandle的run方法,我们进入该方法。

 @Overridepublic void run() {while(started){try {/*获取当前有哪些事件*/selector.select(1000);/*获取事件的集合*/Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){SelectionKey key = iterator.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。*/iterator.remove();handleInput(key);}} catch (IOException e) {e.printStackTrace();}}}

首先执行了selector.select(1000)方法,该方法是一个阻塞方法,它会等待一段时间(以毫秒为单位),直到有一个或多个通道准备好进行 I/O 操作、超时时间到达或者当前线程被中断。底层实现会查询注册在 Selector 上的所有通道,检查它们是否处于就绪状态。就绪状态表示通道可以执行某种 I/O 操作,比如读取或写入数据。
当有通道处于就绪状态时,select() 方法会返回对应的通道数量,并且可以通过调用 selector.selectedKeys() 方法获取到这些就绪的 SelectionKey 集合。而在超时时间到达之前,如果没有通道处于就绪状态或者当前线程被中断,select() 方法也会提前返回,返回值为 0。

  public int select(long var1) throws IOException {if (var1 < 0L) {throw new IllegalArgumentException("Negative timeout");} else {return this.lockAndDoSelect(var1 == 0L ? -1L : var1);}}

select最底层也是调用的本地方法,而且它是线程安全的。我们这里只需要知道它会返回就绪的通道的数量。然后调用下面方法来获取所有的就绪的SelectionKey的集合

Set<SelectionKey> selectionKeys = selector.selectedKeys();public Set<SelectionKey> selectedKeys() {if (!this.isOpen() && !Util.atBugLevel("1.4")) {throw new ClosedSelectorException();} else {return this.publicSelectedKeys;}}

然后就迭代就绪的SelectionKey,然后将该事件从集合中删除(表示这个事件已经被处理了),然后就调用了handleInput来开始具体的处理。

 private void handleInput(SelectionKey key) throws IOException {if(key.isValid()){/*处理新接入的客户端的请求*/if(key.isAcceptable()){/*获取关心当前事件的Channel*/ServerSocketChannel ssc= (ServerSocketChannel) key.channel();/*接受连接*/SocketChannel sc = ssc.accept();System.out.println("==========建立连接=========");sc.configureBlocking(false);/*关注读事件*/sc.register(selector,SelectionKey.OP_READ);}/*处理对端的发送的数据*/if(key.isReadable()){SocketChannel sc = (SocketChannel) key.channel();/*创建ByteBuffer,开辟一个缓冲区*/ByteBuffer buffer = ByteBuffer.allocate(1024);/*从通道里读取数据,然后写入buffer*/int readBytes = sc.read(buffer);if(readBytes>0){/*将缓冲区当前的limit设置为position,position=0,用于后续对缓冲区的读取操作*/buffer.flip();/*根据缓冲区可读字节数创建字节数组*/byte[] bytes = new byte[buffer.remaining()];/*将缓冲区可读字节数组复制到新建的数组中*/buffer.get(bytes);String message = new String(bytes,"UTF-8");System.out.println("服务器收到消息:"+message);/*处理数据*/String result = Const.response(message);/*发送应答消息*/doWrite(sc,result);}else if(readBytes<0){/*取消特定的注册关系*/key.cancel();/*关闭通道*/sc.close();}}}}
/*发送应答消息*/private void doWrite(SocketChannel sc,String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer buffer = ByteBuffer.allocate(bytes.length);buffer.put(bytes);buffer.flip();sc.write(buffer);}

然后它会获得这个SelectionKey所绑定的通道。首先我们可以发现有两个if判断这里就是该SelectionKey绑定的事件是读事件、写事件还是连接事件(注意连接事件是客户端的),如果是连接事件就获得ServerSocketChannel对象:

ServerSocketChannel ssc= (ServerSocketChannel) key.channel();

然后就可以执行下面代码来处理连接了,可以发现也是调用的accept方法,因为前面说过通道的底层是封装了Socket了的。

SocketChannel sc = ssc.accept();sc.register(selector,SelectionKey.OP_READ);

可以发现ssc.accept(),也就是说一旦有连接接入就会创建一个新的SocketChannel对象,然后这个通道也要注册到selector中,绑定事件为读事件,这样就可以接受客户端发来的数据了。

如果SelectionKey绑定的事件是读事件,说明现在已经接受到了用户的数据了我们可以进行处理了,首先我们仍然是从SelectionKey获取对应的通道。

 SocketChannel sc = (SocketChannel) key.channel();

然后从channel中读取数据,注意这里就是和BIO很大的不同的地方,它不是以流的形式读完所有数据,而是读到了一个buffer缓冲中。

 ByteBuffer buffer = ByteBuffer.allocate(1024);

注意此时只是将数据读到了一个缓冲中,应用程序还没有处理数据,现在有了这个缓冲我们就可以很方便的处理接受到的数据了,注意此时buffer要调用flip方法来切换模式。flip 方法将 Buffer 从写模式切换到读模式。调用 flip()方法会将 position 设回 0,并将 limit设置成之前的position。

   buffer.flip();

最后服务端向客户端发出回应

doWrite(sc,result);

以上就是服务端的大致工作中原理,下面我们看看客户端又是怎么工作的。

public class NioClient {private static NioClientHandle nioClientHandle;public static void start(){nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT);//nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,8888);new Thread(nioClientHandle,"client").start();}//向服务器发送消息public static boolean sendMsg(String msg) throws Exception{nioClientHandle.sendMsg(msg);return true;}public static void main(String[] args) throws Exception {start();Scanner scanner = new Scanner(System.in);while(NioClient.sendMsg(scanner.next()));}}public class NioClientHandle implements Runnable{private String host;private int port;private volatile boolean started;private Selector selector;private SocketChannel socketChannel;public NioClientHandle(String ip, int port) {this.host = ip;this.port = port;try {/*创建选择器的实例*/selector = Selector.open();/*创建ServerSocketChannel的实例*/socketChannel = SocketChannel.open();/*设置通道为非阻塞模式*/socketChannel.configureBlocking(false);started = true;} catch (IOException e) {e.printStackTrace();}}public void stop(){started = false;}@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循环遍历selectorwhile(started){try{//无论是否有读写事件发生,selector每隔1s被唤醒一次selector.select(1000);//获取当前有哪些事件可以使用Set<SelectionKey> keys = selector.selectedKeys();//转换为迭代器Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;while(it.hasNext()){key = it.next();/*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活的键出现,这会导致我们尝试再次处理它。*/it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector关闭后会自动释放里面管理的资源if(selector != null)try{selector.close();}catch (Exception e) {e.printStackTrace();}}//具体的事件处理方法private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){//获得关心当前事件的channelSocketChannel sc = (SocketChannel) key.channel();//连接事件if(key.isConnectable()){if(sc.finishConnect()){socketChannel.register(selector,SelectionKey.OP_READ);}else System.exit(1);}//有数据可读事件if(key.isReadable()){//创建ByteBuffer,并开辟一个1M的缓冲区ByteBuffer buffer = ByteBuffer.allocate(1024);//读取请求码流,返回读取到的字节数int readBytes = sc.read(buffer);//读取到字节,对字节进行编解码if(readBytes>0){//将缓冲区当前的limit设置为position,position=0,// 用于后续对缓冲区的读取操作buffer.flip();//根据缓冲区可读字节数创建字节数组byte[] bytes = new byte[buffer.remaining()];//将缓冲区可读字节数组复制到新建的数组中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客户端收到消息:" + result);}//链路已经关闭,释放资源else if(readBytes<0){key.cancel();sc.close();}}}}private void doWrite(SocketChannel channel,String request)throws IOException {//将消息编码为字节数组byte[] bytes = request.getBytes();//根据数组容量创建ByteBufferByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);//将字节数组复制到缓冲区writeBuffer.put(bytes);//flip操作writeBuffer.flip();//发送缓冲区的字节数组/*关心事件和读写网络并不冲突*/channel.write(writeBuffer);}private void doConnect() throws IOException{/*非阻塞的连接*/if(socketChannel.connect(new InetSocketAddress(host,port))){socketChannel.register(selector,SelectionKey.OP_READ);}else{socketChannel.register(selector,SelectionKey.OP_CONNECT);}}//写数据对外暴露的APIpublic void sendMsg(String msg) throws Exception{doWrite(socketChannel, msg);}}

其实有了上面的基础,客户端就很简单了,只是多了一个连接事件,我们看看这部分:

if(socketChannel.connect(new InetSocketAddress(host,port))){socketChannel.register(selector,SelectionKey.OP_READ);}else{socketChannel.register(selector,SelectionKey.OP_CONNECT);}

这个if判断是很关键的,我们知道我们调用connect方法后,底层需要进行TCP的三次握手,如果网络状况不好的话,这个connect方法执行完毕后可能连接并没有执行完毕socketChannel.connect(new InetSocketAddress(host,port))如果为false就说明连接没有建立完,所以需要创建一个通道来处理连接事件,否则我们就可以注册读事件通道来等待服务端回应。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/576486.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

产品经理的自我修养

点击下载《产品经理的自我修养》 1. 前言 在产品领域取得成功的关键在于持续的激情。只有保持热情不减,我们才能克服各种困难,打造出卓越的产品。 如果你真心渴望追求产品之路,我强烈建议你立即行动起来,亲自参与实际的产品创作。无论是建立一个网站、创建一个社群,还是…

黑苹果安装,黑苹果小白详细教程

前言&#xff08;废话&#xff09;本人电脑小白&#xff0c;看了网上很多的教程&#xff0c;整合用了20个小时&#xff0c;反正看的太多了&#xff0c;反而不知道咋弄了&#xff0c;最后看不下去了&#xff0c;就试了一下&#xff0c;结果稀里糊涂的成功了&#xff0c;我也不知…

农村分散式生活污水分质处理及循环利用技术指南

标准已完成意见征集&#xff1a; 本文件给出了农村分散式生活污水分质处理及循环利用的总则、污水收集、污水分质处理、资源化利用、利用模式、运维管理等的指导。 本文件适用于农村分散式生活污水分质处理及循环利用的设施新建、扩建和改建工程的设计、施工与运维。 注:本文件…

【Redis教程0x08】详解Redis过期删除策略内存淘汰策略

引言 Redis的过期删除策略和内存淘汰策略是经常被问道的问题&#xff0c;这两个机制都是做删除操作&#xff0c;但是触发的条件和使用的策略是不同的。今天就来深入理解一下这两个策略。 过期删除策略 Redis 是可以对 key 设置过期时间的&#xff0c;因此需要有相应的机制将…

Aigtek:功率放大器的定义、指标参数及性能特点

功率放大器是一种电子器件或电路&#xff0c;用于将输入信号的功率放大到更高的功率级别。它在各种应用中起到关键的作用&#xff0c;例如音频放大器、射频放大器、激光放大器等。下面西安安泰将介绍功率放大器的定义、指标参数以及其性能特点。 功率放大器的定义&#xff1a; …

自动发卡平台源码优化版配套免签个人支付宝微信插件

这款免签个人支付宝微信插件&#xff0c;配套的是 自动发卡平台源码优化版&#xff0c;支持个人免签支付 其他系统的不支持&#xff01;

政安晨:专栏目录【TensorFlow与Keras实战演绎机器学习】

政安晨的个人主页&#xff1a;政安晨 欢迎 &#x1f44d;点赞✍评论⭐收藏 收录专栏: TensorFlow与Keras实战演绎机器学习 希望政安晨的博客能够对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff01; 本篇是作者政安晨的专栏《TensorFlow与Keras…

33.HarmonyOS App(JAVA)鸿蒙系统app数据库增删改查

33.HarmonyOS App(JAVA)鸿蒙系统app数据库增删改查 关系数据库 关系对象数据库&#xff08;ORM&#xff09; 应用偏好数据库 分布式数据库 关系型数据库&#xff08;Relational Database&#xff0c;RDB&#xff09;是一种基于关系模型来管理数据的数据库。HarmonyOS关系型…

光明源@智慧公厕的建设要求是什么

作为城市公共卫生设施的重要组成部分&#xff0c;智慧公厕的建设要求愈发受到重视。它们不仅是城市形象的窗口&#xff0c;更是为民众提供便捷、舒适的卫生服务的重要载体。那么&#xff0c;究竟什么样的要求才能确保智慧公厕的高效建设呢&#xff1f;今日&#xff0c;让我们一…

基于Java仓库管理系统设计与实现(源码+部署文档+论文)

博主介绍&#xff1a; ✌至今服务客户已经1000、专注于Java技术领域、项目定制、技术答疑、开发工具、毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精彩专栏 推荐订阅 &#x1f447;&#x1f3fb; 不然下次找不到 Java项目精品实…

【STM32CubeMX(3)】GPIO上拉输入——读取按键状态

通过本节可以学习到&#xff1a; 如何在CubeMX配置上拉输入什么是上拉输入如何读取一个GPIO的输入状态 软件环境&#xff1a; STM32CubeMX version6.10.0 Keil_v5&#xff08;MDK-ARM&#xff09; version 5.32 硬件环境&#xff1a; STM32F103C8T6最小系统板&#xff08;…

基于双vip+GTID的半同步主从复制集群项目(MySQL集群)

项目标题&#xff1a;基于keepalivedGTID的半同步主从复制MySQL集群 准备七台机器&#xff0c;其中有四台时MySQL服务器&#xff0c;搭建主从复制的集群&#xff0c;一个master&#xff0c;2个slave服务器&#xff0c;一个延迟备份服务器。同时延迟备份服务器也可以充当异地备…