什么是 Netty
Netty的官网: [https://netty.io/
Netty是一个Java NIO技术的开源异步事件驱动的网络编程框架,用于快速开发可维护的高性能协议服务器和客户端。
往通俗了讲,可以将 Netty 理解为:一个将Java NIO进行了大量封装,并大大降低Java NIO使用难度和上手门槛的网络编程框架。
Netty 的特点
- 高并发:基于 NIO(Nonblocking IO,非阻塞IO)开发,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高
- 传输快:传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输
- 封装好:封装了 NIO 操作的很多细节,提供了易于使用调用接口
Netty 的优势
- 使用简单:封装了 NIO 的很多细节,使用更简单
- 功能强大:预置了多种编解码功能,支持多种主流协议
- 扩展性强:可以通过 ChannelHandler 对通信框架进行灵活地扩展
- 性能优异:通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优
- 运行稳定:Netty 修复了已经发现的所有 NIO 的 bug,让开发人员可以专注于业务本身
- 社区活跃:Netty 是活跃的开源项目,版本迭代周期短,bug 修复速度快
Netty 能做什么
- 分布式系统中的RPC通信
- 在分布式系统中,各个节点之间需要进行远程服务调用,Netty 作为高性能的通信框架,经常被用作 RPC 框架的基础通信组件。
- 例如,阿里巴巴的分布式服务框架 Dubbo 使用 Netty 作为其默认的通信框架。
- 即时通讯(IM)系统
- 即时通讯系统需要处理大量的并发连接和低延迟的消息传输,Netty 的高并发和低延迟特性使其成为构建这类系统的理想选择。
- 消息推送系统
- 消息推送系统需要向多个客户端实时推送消息,Netty 能够高效地处理这些推送操作。
- 物联网(IoT)
- 物联网应用中,大量的设备需要与服务器进行通信,Netty 可以作为设备和服务器之间通信的基础框架。
- 游戏行业
- 在游戏行业中,Netty 可用于开发高性能的游戏服务器,处理游戏内的数据通信。
- 大数据分布式计算
- 在大数据领域,Netty 可用于构建分布式计算框架,如 Apache Spark 和 Apache Storm,这些框架需要高效地处理大量数据的传输。
- Web应用
- Netty 可用于构建高性能的Web服务器和代理服务器,支持 HTTP、WebSocket 等协议。
- 微服务架构
- 在微服务架构中,服务之间的通信需要快速且可靠,Netty 可以作为服务间通信的底层框架。
Netty 的高性能、高可靠性、易用性和可扩展性使其成为互联网应用开发中的首选网络编程框架之一。
核心概念
Channel(通道)
Channel (通道)是 Netty 中的网络操作抽象,代表一个网络连接
Netty 提供了多种类型的 Channel,例如 NioSocketChannel(用于客户端 TCP 连接)和 NioServerSocketChannel(用于服务器端 TCP 监听)。
ChannelHandler(通道处理器)
ChannelHandler (通道处理器)是 Netty 中用于处理网络事件的接口(即接收通道消息)
常见的 ChannelHandler 包括 SimpleChannelInboundHandler、ChannelInboundHandlerAdapter 等。
ChannelPipeline(通道流水线)
ChannelPipeline(通道流水线) 是一个处理器链,包含了一系列的 ChannelHandler(通道处理器),数据和事件在 ChannelPipeline 中按照顺序被多个 ChannelHandler 处理
ChannelInitializer(通道初始化器)
ChannelInitializer 用于初始化新创建的 Channel,通常在其中设置 ChannelPipeline。它允许动态地为 Channel 设置处理器,通常在服务器端使用。
Bootstrap(引导启动辅助类)
Bootstrap 是 Netty 的客户端启动辅助类,用于配置客户端 Channel 的参数并启动客户端。服务器端也有一个对应的 ServerBootstrap 类。
Future 和 ChannelFuture
Future 表示一个异步操作的结果,可以在操作完成时获取结果或处理通知。ChannelFuture 是 Future 的一个子类型,专门用于表示 Channel 的异步操作。
Netty 编程案例
1. 创建 Maven 工程
2. 引入 Netty Maven 依赖
<!-- netty -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.87.Final</version>
</dependency>
3. 创建服务器端启动类
package com.binge.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class NettyServer {//服务器监听端口private int port;//构造方法public NettyServer(int port) {this.port = port;}public void run() throws InterruptedException {// 创建线程池,用于处理服务器的接受连接请求EventLoopGroup bossGroup = new NioEventLoopGroup();// 创建线程池,用于处理已经接受的连接,包括读取数据、处理数据和发送数据EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建服务器启动器ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup) //设置线程池.channel(NioServerSocketChannel.class) //实例化一个监听通道(Channel).childHandler(new ChannelInitializer() { //初始化连接通道配置@Overrideprotected void initChannel(Channel channel) throws Exception {// 设置自定义通道消息处理器channel.pipeline().addLast(new StringDecoder(), new StringEncoder(), new ServerChannelHandler());}}).option(ChannelOption.SO_BACKLOG, 128) //设置服务器可以挂起未处理的连接的数量.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置 TCP 的保活机制,用于检测死连接// 绑定服务器监听端口, 同步等待成功ChannelFuture future = bootstrap.bind(port).sync();//打印服务器启动信息System.out.println("Server started on port " + port);// 开启通道监监听器,监听通道是否关闭future.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public static void main(String[] args) {// 创建服务器实例NettyServer server = new NettyServer(8888);try {server.run(); // 启动服务器} catch (InterruptedException e) {e.printStackTrace();}}
}
4. 创建服务器端通道处理器
package com.binge.server;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ServerChannelHandler extends SimpleChannelInboundHandler {/*** 客户端连接成功* @param ctx 通道上下文*/public void channelActive(ChannelHandlerContext ctx) {System.out.println("连接客户端成功..");}/*** 处理接收到的消息* @param ctx 通道上下文* @param message 消息对象*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object o) {System.out.println("Server received: " + o.toString());ctx.writeAndFlush("Hello,client!");}/*** 处理I/O事件的异常* @param ctx 通道上下文* @param cause 异常原因*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof java.net.SocketException) {System.out.println("客户端连接已断开..");} else {System.err.println("服务器捕获到异常,但将继续运行... ");cause.printStackTrace();}// 不要调用ctx.close(),这样连接就不会关闭}
}
5. 创建客户端启动类
package com.binge.client;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;public class NettyClient {//服务器主机地址private String host;//服务器监听端口private int port;//构造方法public NettyClient(String host, int port) {this.host = host;this.port = port;}public void run() throws InterruptedException {//创建线程池,处理客户端请求连接和接受服务器消息EventLoopGroup group = new NioEventLoopGroup();try {// 创建客户端启动器Bootstrap bootstrap = new Bootstrap();bootstrap.group(group) //设置线程池.channel(NioSocketChannel.class) //实例化一个TCP连接通道.handler(new ChannelInitializer() { //初始化连接通道配置@Overrideprotected void initChannel(Channel channel) {//设置自定义通道消息处理器channel.pipeline().addLast(new StringDecoder(), new StringEncoder(), new ClientChannelHandler());}});// 连接服务器并等待连接成功ChannelFuture future = bootstrap.connect(host, port).sync();//打印连接服务器成功信息System.out.println("Connected to server: " + host + ":" + port);//发送消息给服务器future.channel().writeAndFlush("Hello, server!");//等待通道关闭future.channel().close();//开启通道监监听器,监听通道是否关闭future.channel().closeFuture().sync();} finally {//优雅关闭线程池group.shutdownGracefully();}}public static void main(String[] args) {// 创建客户端实例NettyClient client = new NettyClient("localhost", 8888);try {client.run(); // 启动客户端} catch (InterruptedException e) {e.printStackTrace();}}
}
6. 创建客户端通道处理器
package com.binge.client;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ClientChannelHandler extends SimpleChannelInboundHandler {/*** 服务器连接成功* @param ctx 通道上下文*/public void channelActive(ChannelHandlerContext ctx) {System.out.println("连接服务器成功..");}/*** 读取通道接收的消息* @param ctx 通道上下文* @param message 接收的消息*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object message) {System.out.println(message);}
}
7. 工程目录结构
8. 测试
运行 NettyServer 启动服务器(监听端口)
Server started on port 8888
运行 NettyClient 启动客户端
连接服务器成功..
Connected to server: localhost:8888
这是服务器端显示如下:
Server started on port 8888
连接客户端成功..
Server received: Hello, server!
测试成功:)