Netty Review - Netty自动重连机制揭秘:原理与最佳实践

文章目录

  • 概述
  • Pre
  • 客户端自动重连
  • Code
    • Server
    • Client (重点)
  • 测试
    • 启动自动重连
    • 运行过程中断链后的自动重连

在这里插入图片描述


概述

在这里插入图片描述


Pre

Netty Review - 深入探讨Netty的心跳检测机制:原理、实战、IdleStateHandler源码分析


客户端自动重连

自动重连是一个用于提高网络应用稳定性和可靠性的功能。当客户端与服务器之间的连接意外断开时,客户端可以自动尝试重新连接到服务器,以确保数据的正常传输。

自动重连是指在网络通信中,当客户端与服务器之间的连接由于某种原因断开时,客户端能够自动尝试重新建立连接的机制。这是一种用于提高网络应用稳定性和可靠性的功能。

具体来说,当客户端检测到与服务器的连接中断时,它会自动发起新的连接尝试,以确保数据的正常传输。这对于处理网络不稳定性、临时断开或服务器重新启动等情况非常重要,可以减少用户干预,提升应用的用户体验。


Code

Server

package com.artisan.reconnect;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class ArtisanNettyServer {public static void main(String[] args) throws Exception {// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup(8);try {// 创建服务器端的启动对象ServerBootstrap bootstrap = new ServerBootstrap();// 使用链式编程来配置参数bootstrap.group(bossGroup, workerGroup) //设置两个线程组// 使用NioServerSocketChannel作为服务器的通道实现.channel(NioServerSocketChannel.class)// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置处理器//ch.pipeline().addLast(new LifeCycleInBoundHandler());ch.pipeline().addLast(new ArtisanNettyServerHandler());}});System.out.println("netty server start。。");// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕ChannelFuture cf = bootstrap.bind(9000).sync();// 给cf注册监听器,监听我们关心的事件/*cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println("监听端口9000成功");} else {System.out.println("监听端口9000失败");}}});*/// 等待服务端监听端口关闭,closeFuture是异步操作// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
  1. EventLoopGroup:Netty使用EventLoopGroup来处理事件循环和IO操作。这里创建了两个EventLoopGroup,一个用于处理连接请求(bossGroup),另一个用于处理实际的业务逻辑(workerGroup)。
  2. ServerBootstrap:这是Netty的另一个核心组件,用于配置和初始化服务器。
  3. ChannelFuture:这是一个异步结果对象,用于表示通道操作的结果。
  4. ChannelInitializer:这是一个用于初始化新连接的处理器。
  5. ArtisanNettyServerHandler:这应该是一个自定义的处理类,用于处理业务逻辑,下文给出。
  6. bind()closeFuture()bind()方法用于启动服务器并绑定端口,closeFuture()用于等待服务器通道关闭。
  7. finally块:这里确保在服务器启动失败或成功后,EventLoopGroup会被优雅地关闭,以释放资源。

package com.artisan.reconnect;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world* @Description: 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)*/
public class ArtisanNettyServerHandler extends ChannelInboundHandlerAdapter {/*** 读取客户端发送的数据** @param ctx 上下文对象, 含有通道channel,管道pipeline* @param msg 就是客户端发送的数据* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("服务器读取线程 " + Thread.currentThread().getName());//Channel channel = ctx.channel();//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBufferByteBuf buf = (ByteBuf) msg;System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));}/*** 数据读取完毕处理方法** @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(buf);}/*** 处理异常, 一般是需要关闭通道** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

定义了一个自定义的Netty服务器处理器ArtisanNettyServerHandler,它继承自ChannelInboundHandlerAdapter。这个处理器包含了几个重要的方法来处理客户端的请求和响应:

  1. channelRead(ChannelHandlerContext ctx, Object msg):当服务器从客户端接收到数据时,这个方法会被调用。在这个方法中,你可以编写处理客户端发送的数据的逻辑。在这个例子中,它简单地打印了接收到的消息内容。
  2. channelReadComplete(ChannelHandlerContext ctx):这个方法在channelRead方法执行完成后被调用。在这个方法中,你可以发送响应给客户端。在这个例子中,它发送了一个简单的"HelloClient"消息给客户端。
  3. exceptionCaught(ChannelHandlerContext ctx, Throwable cause):这个方法在出现异常时被调用。在这个方法中,你可以编写异常处理的逻辑。在这个例子中,它简单地关闭了通道。

Client (重点)

这段代码是一个使用Netty框架的简单客户端示例,它实现了重连功能。以下是这段代码的中文注释解释:

package com.artisan.reconnect;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.TimeUnit;
/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world* @Description: 实现了重连的客户端*/
public class ArtisanNettyClient {private String host;private int port;private Bootstrap bootstrap;private EventLoopGroup group;public static void main(String[] args) throws Exception {ArtisanNettyClient artisanNettyClient = new ArtisanNettyClient("localhost", 9000);artisanNettyClient.connect();}public ArtisanNettyClient(String host, int port) {this.host = host;this.port = port;init();}private void init() {// 客户端需要一个事件循环组group = new NioEventLoopGroup();// 创建客户端启动对象// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 加入处理器ch.pipeline().addLast(new ArtisanNettyClientHandler(ArtisanNettyClient.this));}});}public void connect() throws Exception {System.out.println("netty client start。。");// 启动客户端去连接服务器端ChannelFuture cf = bootstrap.connect(host, port);cf.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {// 重连交给后端线程执行future.channel().eventLoop().schedule(() -> {System.err.println("重连服务端...");try {connect();} catch (Exception e) {e.printStackTrace();}}, 3000, TimeUnit.MILLISECONDS);} else {System.out.println("服务端连接成功...");}}});// 对通道关闭进行监听cf.channel().closeFuture().sync();}
}

这段代码定义了一个名为ArtisanNettyClient的类,它包含了客户端的初始化和连接逻辑。

  • EventLoopGroup:Netty使用EventLoopGroup来处理事件循环和IO操作。这里创建了一个NioEventLoopGroup,用于处理客户端的IO操作。
  • Bootstrap:这是Netty的另一个核心组件,用于配置和初始化客户端。
  • ChannelFuture:这是一个异步结果对象,用于表示通道操作的结果。
  • connect()方法:这个方法用于启动客户端并连接到服务器。如果连接失败,它将使用schedule方法在3秒后重试连接。
  • ArtisanNettyClientHandler:这应该是一个自定义的处理类,用于处理业务逻辑,但在这段代码中没有给出具体实现。
  • init()方法:这个方法用于初始化客户端的BootstrapEventLoopGroup
  • operationComplete()方法:这是ChannelFutureListener的回调方法,用于处理连接操作的结果。如果连接失败,它将安排重试连接。如果连接成功,它将打印成功消息。
  • closeFuture().sync():这个方法用于等待客户端通道关闭,确保在客户端关闭之前完成所有必要的清理工作。

这个示例中,客户端将尝试连接到指定的服务器地址和端口,如果连接失败,它将自动重试连接。


package com.artisan.reconnect;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class ArtisanNettyClientHandler extends ChannelInboundHandlerAdapter {private ArtisanNettyClient artisanNettyClient;public ArtisanNettyClientHandler(ArtisanNettyClient artisanNettyClient) {this.artisanNettyClient = artisanNettyClient;}/*** 当客户端连接服务器完成就会触发该方法** @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 创建一个ByteBuf,包含"HelloServer"的字符串ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));// 向服务器发送消息ctx.writeAndFlush(buf);}/*** 当通道有读取事件时会触发,即服务端发送数据给客户端** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 获取消息内容ByteBuf buf = (ByteBuf) msg;// 打印服务端发送的消息System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));// 打印服务端的地址System.out.println("服务端的地址: " + ctx.channel().remoteAddress());}/*** 当通道处于不活动状态时调用** @param ctx* @throws Exception*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 打印提示信息System.err.println("运行中断开重连。。。");// 调用客户端的connect方法进行重连artisanNettyClient.connect();}/*** 当捕获到异常时调用** @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {// 打印异常堆栈信息cause.printStackTrace();// 关闭通道ctx.close();}
}

这个处理类包含了一些基本的方法来处理通道的激活、读取、不活动状态和异常。以下是每个方法的简要说明:

  • channelActive():当客户端成功连接到服务器时,这个方法会被调用,并向服务器发送一条消息。
  • channelRead():当客户端从服务器接收到消息时,这个方法会被调用,并打印出接收到的消息内容和服务器的地址。
  • channelInactive()当通道不再活跃时(例如,连接被断开),这个方法会被调用,并尝试重新连接服务器。
  • exceptionCaught():当捕获到异常时,这个方法会被调用,并打印异常的堆栈跟踪信息,然后关闭通道。

这个处理类是客户端逻辑的一部分,它负责处理客户端与服务器之间的交互。


测试

启动自动重连

先启动客户端哈(务必) , 再启动服务端,来验证下 客户端的自动重连 。

起客户端,不起服务端

在这里插入图片描述

起服务端

在这里插入图片描述


运行过程中断链后的自动重连

系统运行过程中网络故障或服务端故障,导致客户端与服务端断开连接了也需要重连,可以在客户端处理数据的Handler的channelInactive方法中进行重连。

运行过程中,请将服务端的连接断开,过一会儿再启动,验证客户端在运行过程中的自动重连

断开服务端

在这里插入图片描述

恢复服务端

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/297654.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java 第16章 坦克大战1-2 思路整理

文章目录 1 绘制坦克坦克画板Panel画框Frame 2 让坦克动起来画板Panel 3 本章作业 1 绘制坦克 坦克 不同坦克有共同属性&#xff0c;可以先提取共同特征&#xff08;坐标位置&#xff09;作为父类&#xff0c;然后其他坦克类继承它。 画板Panel 有坦克在画板上显示&#xf…

Openai的openai新版本调用方式

最近大家有没有发现Openai的openai已经更新到1.6.1了,而且API的调用方式发生了巨大的变化,下面来看看openai新的调用方式吧。 欢迎关注公众号 module ‘openai’ has no attribute ChatCompletion. 提示openai的版本过低。(pip install -U openai) 1. Chat API from openai…

4.7 【共享源】流的生产者(二)

七,模式 流的模式决定了Screen如何使前台缓冲区可用。生产者通过调用screen_set_stream_property_iv()并设置SCREEN_PROPERTY_MODE属性来设置模式。有效模式如下: 7.1 SCREEN_STREAM_MODE_DEFAULT 如果生产者应用程序没有在流上明确设置 SCREEN_PROPERTY_MODE 属性,则 Sc…

七天搞定java接口自动化测试实战,一文搞定...

前言 无论是自动化测试还是自动化部署&#xff0c;撸码肯定少不了&#xff0c;所以下面的基于java语言的接口自动化测试&#xff0c;要想在业务上实现接口自动化&#xff0c;前提是要有一定的java基础。 如果没有java基础&#xff0c;也没关系。这里小编也为大家提供了一套jav…

电子科大软件系统架构设计——软件建模详细设计

文章目录 软件建模详细设计概述软件建模详细设计目标软件建模详细设计原则开闭原则里氏 (Liskov) 替换原则依赖倒置原则接口分离原则单一职责原则最少知识原则&#xff08;迪米特法则&#xff09;高内聚原则松耦合原则可重用原则 软件建模详细设计内容 UML 软件静态结构视图建模…

vitis HLS中实现canny算法的IP核

一、前言 canny边缘检测主要用于提取图像的边缘&#xff0c;是最常用且有效的边缘检测算法。在AMD赛灵思提供的库函数中&#xff0c;使用xf::cv::Canny和xf::cv::EdgeTracing两个函数实现canny边缘提取。本文举例说明如何在vitis HLS 2023.1中实现canny算法。 二、xf::cv::Cann…

【数据结构】布隆过滤器原理详解及其代码实现

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能AI、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推荐--…

luceda ipkiss教程 53:在版图上加中文

要在版图上加中文&#xff0c;如&#xff1a; 可以通过如下方法实现&#xff1a; 首先&#xff0c;可以在ppt中加入文本框&#xff0c;在文本框中输入想要加到版图上的中文内容&#xff0c;如&#xff0c;复旦大学&#xff0c;并将文本框存为windows位图。 其次&#xff0c;通…

基于Java (spring-boot)的仓库管理系统

一、项目介绍 本系统的使用者一共有系统管理员、仓库管理员和普通用户这3种角色: 1.系统管理员&#xff1a;通过登录系统后&#xff0c;可以进行管理员和用户信息的管理、仓库和物品分类的管理&#xff0c;以及操作日志的查询&#xff0c;具有全面的系统管理权限。 2.仓库管理…

现代控制理论-李雅普诺夫

现代控制理论-李雅普诺夫 单输入单输出系统&#xff08;BIBO&#xff09;的系统函数如下&#xff1a; 则&#xff0c;该系统的能控标准型&#xff08;能空性&#xff09;为&#xff1a; 能观性&#xff1a; 李雅普诺夫下的稳定性&#xff1a; 李雅普诺夫下的渐进稳定性&a…

Docker部署 flowable-ui 进行流程建模

Docker部署 flowable-ui 进行流程建模 简介 安装Docker Desktop,本篇无安装步骤安装正常打开运行后&#xff0c;正式开始部署flowable-uicmd执行拉取镜像操作docker pull flowable/flowable-uicmd启动镜像docker run -d --name flowable -p 8081:8080 flowable/flowable-ui修…

【C++11/17】std::map高效插入

我们在使用stl的映射容器std::map时&#xff0c;经常需要向容器中插入数据。由于map的元素key值是唯一的&#xff0c;我们经常遇到这样的场景&#xff1a; 向map中插入元素时&#xff0c;指定的key已经存在则直接更新&#xff1b;指定的key不存在&#xff0c;然后才做插入操作…