1、socket通信建立流程
1.1、创建服务端流程
-
使用 socket 函数来创建 socket服务。
-
使用 bind 函数绑定端口。
-
使用 listen 函数监听端口。
-
使用 accept 函数接收客户端请求。
1.2、创建客户端流程
-
使用 socket 函数来创建 socket 服务。
-
使用 connect 函数连接到 socket 服务端。
以下图表演示了客户端与服务端之间的通信流程:
1.3、创建服务端代码
package com.example.dyc.mysocket;import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MySocketServer {// 预定义字典private static final Map<String, String> dictionary;//字典初始化static {dictionary = new HashMap<>();dictionary.put("apple", "苹果");dictionary.put("pear", "梨");}//定义服务器端口,范围在[0, 65535]public static final int PORT = 8888;public static void main(String[] args) throws IOException {System.out.println(new Date() + ":" + 1);ServerSocket serverSocket = new ServerSocket(PORT);//从CLOSED到LISTEN状态System.out.println(new Date() + ":" + 2);ExecutorService executorService = Executors.newFixedThreadPool(10);while (true) {System.out.println(new Date() + ":" + 3);Socket socket = serverSocket.accept(); // 阻塞,等待客户端发起连接,建立连接,到ESTABLISHED状态System.out.println(new Date() + ":" + 4);SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();//得到客户端地址+端口System.out.println(new Date() + ":" + remoteSocketAddress);InputStream inputStream = socket.getInputStream();//得到输入流Scanner scanner = new Scanner(inputStream, "UTF-8");//字符集编码OutputStream outputStream = socket.getOutputStream();//得到输出流Writer writer = new OutputStreamWriter(outputStream, "UTF-8");//字符集编码PrintWriter printWriter = new PrintWriter(writer);// TCP 是一种流式数据,没有明显分界的// 隐含着我们的请求一定 XXXX\nString request = scanner.nextLine();//由scanner得到客户端输入String response = dictionary.getOrDefault(request, "没有找到");// 响应的协议也是 XXX\nprintWriter.println(response);//把响应传入输出printWriter.flush();//发送给客户端socket.close(); // 关闭连接}}
}
1.4、创建客户端代码
package com.example.dyc.mysocket;import java.io.*;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Date;
import java.util.Scanner;public class MySocketClient {public static void main(String[] args) throws IOException {System.out.println(new Date() + ":" + 1);Socket socket = new Socket("127.0.0.1", 8888);//刚建立完连接,传入客户端IP地址和端口SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress();//得到服务器地址+端口System.out.println(new Date() + ":" + remoteSocketAddress);InputStream inputStream = socket.getInputStream();//得到输入流Scanner scanner = new Scanner(inputStream, "UTF-8");//字符集编码OutputStream outputStream = socket.getOutputStream();//得到输出流Writer writer = new OutputStreamWriter(outputStream, "UTF-8");//字符集编码PrintWriter printWriter = new PrintWriter(writer);printWriter.println("apple");//把apple传入输出printWriter.flush();//输出发送给服务器String response = scanner.nextLine();//由scanner得到输入的服务器响应System.out.println(new Date() + ":" + response);socket.close();//关闭连接}
}
2、socket实现BIO
2.1、BIO
传统的网络通讯模型,就是BIO,同步阻塞IO, 其实就是服务端创建一个ServerSocket, 然后就是客户端用一个Socket去连接服务端的那个ServerSocket, ServerSocket接收到了一个的连接请求就创建一个Socket和一个线程去跟那个Socket进行通讯。接着客户端和服务端就进行阻塞式的通信,客户端发送一个请求,服务端Socket进行处理后返回响应,在响应返回前,客户端那边就阻塞等待,什么事情也做不了。 这种方式的缺点, 每次一个客户端接入,都需要在服务端创建一个线程来服务这个客户端,这样大量客户端来的时候,就会造成服务端的线程数量可能达到了几千甚至几万,这样就可能会造成服务端过载过高,最后崩溃死掉。
2.2、代码实现
Client
import java.net.InetSocketAddress;
import java.net.Socket;
import java.text.SimpleDateFormat;
import java.util.Date;public class MyBIOClient {public static void main(String[] args) throws Exception {// 创建 Socket 客户端Socket socket = new Socket();// 与服务端建立连接socket.connect(new InetSocketAddress("127.0.0.1", 8081));SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒");int counter = 0;while (counter < 5) {String now = simpleDateFormat.format(new Date());// 发送请求socket.getOutputStream().write(now.getBytes("UTF-8"));socket.getOutputStream().flush();Thread.sleep(1000);counter++;}// 若方法运行结束后,不调用 close 函数,服务端则会报错:java.net.SocketException: Connection resetsocket.close();System.out.println("客户端关闭了 Socket 连接~!");}
}
Serve
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class MyBIOServe {public static void main(String[] args) throws Exception {// 创建 Socket 服务端,并设置监听的端口ServerSocket serverSocket = new ServerSocket(8081);// 创建线程池以执行客户端请求(防止因请求过多,而导致的阻塞)ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 10, 60,TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());while (true) {// 阻塞方法,监听客户端请求Socket socket = serverSocket.accept();System.out.println("\r\n" + socket);// 创建自定义请求处理器SocketHandler handler = new SocketHandler(socket);// 处理客户端请求poolExecutor.execute(handler);}}
}
SocketHandler
public class SocketHandler implements Runnable {private Socket socket;private static final byte[] BUFFER = new byte[1024];@Overridepublic void run() {try {while (true){System.out.println(Thread.currentThread().getName());// 读取客户端 Socket 请求数据int read = socket.getInputStream().read(BUFFER);if (read != -1) {System.out.println(new String(BUFFER, "UTF-8"));}else{socket.close();System.out.println("服务端关闭了 Socket 连接~!");break;}}} catch (Exception e) {e.printStackTrace();}}public SocketHandler(Socket socket) {this.socket = socket;}
}
3、NIO
3.1、NIO
NIO: NIO是一种同步非阻塞IO, 基于Reactor模型来实现的。其实相当于就是一个线程处理大量的客户端的请求,通过一个线程轮询大量的channel,每次就获取一批有事件的channel,然后对每个请求启动一个线程处理即可。这里的核心就是非阻塞,就那个selector一个线程就可以不停轮询channel,所有客户端请求都不会阻塞,直接就会进来,大不了就是等待一下排着队而已。这里面优化BIO的核心就是,一个客户端并不是时时刻刻都有数据进行交互,没有必要死耗着一个线程不放,所以客户端选择了让线程歇一歇,只有客户端有相应的操作的时候才发起通知,创建一个线程来处理请求。
3.2、代码实现
MyNIOClient2
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;public class MyNIOClient2 {public static void main(String[] args) {//创建远程地址InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);SocketChannel channel = null;//定义缓存ByteBuffer buffer = ByteBuffer.allocate(1024);try {//开启通道channel = SocketChannel.open();//连接远程远程服务器channel.connect(address);Scanner sc = new Scanner(System.in);while (true) {System.out.println("客户端即将给 服务器发送数据..");String line = "clinet2:" + sc.nextLine();if (line.equals("exit")) {break;}//控制台输入数据写到缓存buffer.put(line.getBytes("UTF-8"));//重置buffer 游标buffer.flip();//数据发送到数据channel.write(buffer);//清空缓存数据buffer.clear();//读取服务器返回的数据int readLen = channel.read(buffer);if (readLen == -1) {break;}//重置buffer游标buffer.flip();byte[] bytes = new byte[buffer.remaining()];//读取数据到字节数组buffer.get(bytes);System.out.println("收到了服务器发送的数据 : " + new String(bytes, "UTF-8"));buffer.clear();}} catch (IOException e) {e.printStackTrace();} finally {if (null != channel) {try {channel.close();} catch (IOException e) {e.printStackTrace();}}}}
}
MyNIOService
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Scanner;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class MyNIOService extends Thread {//1.声明多路复用器private Selector selector;//2.定义读写缓冲区private ByteBuffer readBuffer = ByteBuffer.allocate(1024);private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);//3.定义构造方法初始化端口public MyNIOService(int port) {init(port);}//4.main方法启动线程public static void main(String[] args) {new Thread(new MyNIOService(8888)).start();}//5.初始化private void init(int port) {try {System.out.println("服务器正在启动......");//1)开启多路复用器this.selector = Selector.open();//2) 开启服务通道ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();//3)设置为非阻塞serverSocketChannel.configureBlocking(false);//4)绑定端口serverSocketChannel.bind(new InetSocketAddress(port));//5)注册,标记服务通标状态serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);System.out.println("服务器启动完毕");} catch (IOException e) {e.printStackTrace();}}public void run() {while (true) {try {//1.当有至少一个通道被选中,执行此方法this.selector.select();//2.获取选中的通道编号集合Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();//3.遍历keyswhile (keys.hasNext()) {SelectionKey key = keys.next();//4.当前key需要从集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱keys.remove();//5.判断通道是否有效if (key.isValid()) {try {//6.判断是否可读if (key.isAcceptable()) {accept(key);}} catch (CancelledKeyException e) {//出现异常断开连接key.cancel();}try {//7.判断是否可读if (key.isReadable()) {read(key);}} catch (CancelledKeyException e) {//出现异常断开连接key.cancel();}try {//8.判断是否可写if (key.isWritable()) {write(key);}} catch (CancelledKeyException e) {//出现异常断开连接key.cancel();}}}} catch (IOException e) {e.printStackTrace();}}}private void accept(SelectionKey key) {try {//1.当前通道在init方法中注册到了selector中的ServerSocketChannelServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();//2.阻塞方法, 客户端发起后请求返回.SocketChannel channel = serverSocketChannel.accept();//3.serverSocketChannel设置为非阻塞channel.configureBlocking(false);//4.设置对应客户端的通道标记,设置次通道为可读时使用channel.register(this.selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}//使用通道读取数据private void read(SelectionKey key) {try {//清空缓存this.readBuffer.clear();//获取当前通道对象SocketChannel channel = (SocketChannel) key.channel();//将通道的数据(客户发送的data)读到缓存中.int readLen = channel.read(readBuffer);//如果通道中没有数据if (readLen == -1) {//关闭通道key.channel().close();//关闭连接key.cancel();return;}//Buffer中有游标,游标不会重置,需要我们调用flip重置. 否则读取不一致this.readBuffer.flip();//创建有效字节长度数组byte[] bytes = new byte[readBuffer.remaining()];//读取buffer中数据保存在字节数组readBuffer.get(bytes);System.out.println("收到了从客户端 " + channel.getRemoteAddress() +" : " + new String(bytes, "UTF-8"));//注册通道,标记为写操作channel.register(this.selector, SelectionKey.OP_WRITE);} catch (Exception e) {}}//给通道中写操作private void write(SelectionKey key) {//清空缓存this.readBuffer.clear();//获取当前通道对象SocketChannel channel = (SocketChannel) key.channel();//录入数据Scanner scanner = new Scanner(System.in);try {System.out.println("即将发送数据到客户端..");String line = scanner.nextLine();//把录入的数据写到Buffer中writeBuffer.put(line.getBytes("UTF-8"));//重置缓存游标writeBuffer.flip();channel.write(writeBuffer);//清空writeBufferwriteBuffer.clear();channel.register(this.selector, SelectionKey.OP_READ);} catch (Exception e) {e.printStackTrace();}}
}
4、Netty
ps:后面再补充
参考文献:
Socket 之 BIO、NIO、Netty 简单实现 - 知乎 (zhihu.com)
什么是NIO?NIO和BIO,AIO之间的区别是什么? - 知乎 (zhihu.com)
java.nio.Buffer 中的 flip()方法_wrap.flip()-CSDN博客