文章目录
- 概要
- pom依赖
- Netty的server服务端类
- Netty通道初始化
- I/O数据读写处理
- 测试发送消息 并 接收服务端回复
- 异步启动Netty
- 运行截图
概要
Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点
Netty的优点:
1.API使用简单,开发入门门槛低。
2.功能十分强大,预置多种编码解码功能,支持多种主流协议。
3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。
4.性能优异,特别在综合性能上的优异性。
5.成熟,稳定,适用范围广。
6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。
好了,废话不多说了,上代码
pom依赖
<!-- netty依赖 springboot2.x自动导入版本 --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency>
Netty的server服务端类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** Netty服务器(端口自行更换,默认端口10100)* @author wusiwee*/
@Service
@Slf4j
public class NettyServer {/*** 注入Netty通道初始化处理器*/private final NettyChannelInboundHandlerAdapter handlerAdapter;/*** 通过构造函数注入依赖* @param handlerAdapter 处理器*/@Autowiredpublic NettyServer(NettyChannelInboundHandlerAdapter handlerAdapter) {this.handlerAdapter = handlerAdapter;}/*** 启动Netty服务器* @throws Exception 如果启动过程中发生异常*/public void bind() throws Exception {// 定义bossGroup和workerGroup来处理网络事件// 用于接受客户端连接EventLoopGroup bossGroup = new NioEventLoopGroup(1);// 用于实际的业务处理操作EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 创建ServerBootstrap实例来引导绑定和启动服务器ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定使用NIO的传输Channel.channel(NioServerSocketChannel.class)// 设置TCP接收缓冲区大小.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)).childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))// 设置自定义的Channel初始化器.childHandler(new NettyChannelInitializer(handlerAdapter));log.info("netty server start success!");// 绑定端口,并同步等待成功,即启动Netty服务ChannelFuture f = serverBootstrap.bind(10100).sync();// 等待服务端监听端口关闭f.channel().closeFuture().sync();} catch (InterruptedException e) {log.error("Netty server startup interrupted", e);Thread.currentThread().interrupt();} finally {// 优雅关闭事件循环组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
Netty通道初始化
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;/*** 通道初始化* @author wusiwee*/
@Component
public class NettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {/*** 注入,目的是在该 HandlerAdapter 可以正确的注入业务Service*/private final NettyChannelInboundHandlerAdapter handlerAdapter;public NettyChannelInitializer(NettyChannelInboundHandlerAdapter handlerAdapter) {this.handlerAdapter = handlerAdapter;}@Overrideprotected void initChannel(Channel ch) {ChannelPipeline pipeline = ch.pipeline();// 响应字符串pipeline.addLast(new StringEncoder());// 自定义ChannelInboundHandlerAdapterpipeline.addLast(handlerAdapter);}
I/O数据读写处理
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.util.Date;/*** I/O数据读写处理类* 客户端发送的消息 以及 回复客户端消息 均在此处* @ChannelHandler.Sharable 此注解用于在多个 Channel 中重复使用同一个 Handler 实例* @author wusiwee*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{/*** 这里可以注入自己的service*/@Autowiredprivate IUserService iUserService;/*** 从客户端收到新的数据时,这个方法会在收到消息时被调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf in = (ByteBuf) msg;// 确保接收的数据长度足够,minimumLength 是所有字段长度的总和if (in.readableBytes() < MINIMUM_LENGTH) {ctx.writeAndFlush("报文长度过低,数据不完整"+"\n");return;}// 1,读取固定长度字符byte[] frameStart = new byte[4];in.readBytes(frameStart);String frameStartStr = new String(frameStart, java.nio.charset.StandardCharsets.UTF_8);log.info("1.解析:"+frameStartStr);ctx.writeAndFlush("I got it\n");}/*** 从客户端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {log.info("读取完成 channelReadComplete");ctx.flush();}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info("exceptionCaught");cause.printStackTrace();//抛出异常,断开与客户端的连接ctx.close();}/*** 客户端与服务端第一次建立连接时 执行*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);ctx.channel().read();InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();//此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接log.info("客户端连接 channelActive{}", clientIp+" "+ctx.name());}/*** 客户端与服务端 断连时 执行*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();//断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机ctx.close();log.info("channelInactive{}", clientIp);}/*** 服务端当read超时, 会调用这个方法*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {super.userEventTriggered(ctx, evt);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();//超时 断开连接ctx.close();log.info("userEventTriggered{}", clientIp);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) {log.info("注册 channelRegistered");}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) {log.info("channelUnregistered");}@Overridepublic void channelWritabilityChanged(ChannelHandlerContext ctx) {log.info("channelWritabilityChanged");}
}
测试发送消息 并 接收服务端回复
@Testvoid contextLoads() {try {// 服务器地址String host = "127.0.0.1";// 服务器端口int port = 10100;// 要发送的消息String message = "7E7E010038401010123433004D02000B22";Socket socket = new Socket(host, port);// 获取输出流OutputStream outputStream = socket.getOutputStream();// 将字符串转换为字节数组byte[] data = message.getBytes();// 写入数据到输出流outputStream.write(data);// 刷新输出流,确保数据发送outputStream.flush();InputStream input = socket.getInputStream();//读取服务器返回的消息BufferedReader br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));String mess = br.readLine();System.out.println("服务器回复:" + mess);input.close();outputStream.close();socket.close();}catch (Exception e){System.out.println("出现异常");}}
异步启动Netty
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;/*** 启动类*/
@SpringBootApplication
@EnableAsync
public class NettyApplication implements ApplicationRunner{/*** 启动springboot*/public static void main( String[] args ) {SpringApplication.run(NettyApplication.class, args);}/*** 创建独立线程池*/private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS,new LinkedBlockingDeque<>(2),Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardOldestPolicy());/*** 注入Netty消息处理器*/@Resourceprivate NettyChannelInboundHandlerAdapter handlerAdapter;@Overridepublic void run(ApplicationArguments args) throws Exception {// 使用线程池 异步启动Netty服务器executorService.submit(() -> {try {// 启动netty,绑定端口号new NettyServer(handlerAdapter).bind();} catch (Exception e) {// 异常处理System.out.println("启动netty出现异常:"+e.getMessage());}});}
}
运行截图
回复客户端消息的代码片段
测试发送
客户端收到回复,断开连接
攀峰之高险,岂有崖颠;搏海之明辉,何来彼岸?前进不止,奋斗不息。