【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)

基于Netty的分布式通信框架实现

  • 前提介绍
  • 本节重点
    • Dispatcher(分派调度器)
    • EventListener
      • ChannelEventListener
        • Channel通道事件
        • 定义ChannelActionEvent
    • Heartbeat、超时及重连机制

前提介绍

经过阅读《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)》,相信您已经对网络通信框架的网络通信层的实现原理和协议模型有了一定的认识和理解。

整体框架如下图所示:
在这里插入图片描述
对应的组件的基本功能和功能实现范畴。
在这里插入图片描述
在上一节,我们主要讲对应的Dispatcher上面之前的逻辑操作实现,进行了对应的介绍和分析:
在这里插入图片描述

  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • ChannelHandler处理器 :ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter是我们首先要实现和操作的基础。

本节重点

本节内容的重点是针对于Dispatcher分配和调度以及之后的操作流程的介绍和分析。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。
    在这里插入图片描述

  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

Dispatcher(分派调度器)

Dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。
在这里插入图片描述
消息经过Pipline链处理后,将由Dispatcher转发,并进入EventListener链进行处理。Dispatcher内部使用了两个线程池:channelExecutor和dataExecutor。
在这里插入图片描述

  • netExecutor用于处理通道事件和异常事件。由于通道事件可能需要同步调用远程服务,因此该线程池没有设定上限,因为同步调用会阻塞当前线程。

  • dataExecutor用于处理消息事件。根据经验值,默认的最大线程数为150,但可以通过选项参数进行修改。

EventListener

ChannelEventListener

ChannelInboundHandler接口定义了一系列方法,用于处理Channel的入站事件。这些方法负责处理数据从外部系统(如网络)流入Channel的过程。这些方法都是将对应的事件(channelRegistered、channelUnregistered、channelActive、channelInactive)转发给ChannelPipeline中的下一个ChannelInboundHandler,如下面的源码所示:

    /*** Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}/*** Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}/*** Calls {@link ChannelHandlerContext#fireChannelActive()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}/*** Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward* to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.** Sub-classes may override this method to change behavior.*/@Skip@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}
Channel通道事件

channelRegistered、channelUnregistered、channelActive和channelInactive这几个方法是用于处理不同类型的通道事件。下面分别对这几个方法进行详细分析:

  • channelRegistered(ChannelHandlerContext ctx): 这个方法在通道被注册到EventLoop(事件循环)后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelUnregistered(ChannelHandlerContext ctx):这个方法在通道从EventLoop中注销后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelActive(ChannelHandlerContext ctx):这个方法在通道变为活跃状态后被调用。它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。

  • channelInactive(ChannelHandlerContext ctx):这个方法在通道变为非活跃状态后被调用。 它的作用是将该事件转发给ChannelPipeline中的下一个ChannelInboundHandler。
    在这里插入图片描述
    我们只需将相应的实现注入并发布 ChannelActionEvent 对象模型事件。这样,ChannelActionEvent 对象的消费者就能够监听事件并执行相应的逻辑操作。通过这种方式,我们实现了事件的发布与订阅机制,以便实现松耦合的组件间通信,并能根据实际需求对事件进行灵活地处理和扩展。

定义ChannelActionEvent

首先,定义一个自定义事件类 ChannelActionEventextends ,继承自 ApplicationEvent,主要作为通道变化的处理器事件。

public class ChannelActionEvent extends ApplicationEvent {private Object data;public ChannelActionEventextends (Object source, String data) {super(source);this.data = data;}public String getData() {return data;}
}@Component
public class ChannelActionEventListener extends implements ApplicationListener<ChannelActionEvent > {@Overridepublic void onApplicationEvent(MyEvent event) {Object data = event.getData();// 执行对应的逻辑操作System.out.println("Received event with data: " + data);}
}

因此,根据同样的逻辑,ExceptionEvent事件也可以通过方法处理器的exceptionCaught方法进行处理。
在这里插入图片描述

 @Skip@Override@SuppressWarnings("deprecation")public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.fireExceptionCaught(cause);}

DataEvent可以覆盖对应的channelRead、channelReadComplete的方法进行发布对应的事件处理即可。
在这里插入图片描述

  	@Skip@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}@Skip@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}

框架会预先在 XXEventListener 链末端注册 ServiceMessageEventListener,该 Listener 负责调用被注册的 Service,并将
返回值或异常回传。
在这里插入图片描述

Heartbeat、超时及重连机制

Netty提供了读空闲和写空闲的功能来处理网络连接的空闲状态。

读空闲(Read Idle):当连接在指定的时间内没有接收到任何数据时,就会触发读空闲事件。这个事件可以用来检测连接是否处于空闲状态,或者判断通信对方是否还与服务器保持连接。通过设置ChannelOption.READ_IDLE_TIME参数来定义读空闲的时间。

写空闲(Write Idle):当连接在指定的时间内没有发送任何数据时,就会触发写空闲事件。这个事件可以用来定期发送心跳消息或其他需要保持连接的数据。通过设置ChannelOption.WRITE_IDLE_TIME参数来定义写空闲的时间。

在这里插入图片描述
在Netty中,可以通过ChannelOption设置读空闲和写空闲的时间,然后通过ChannelHandler的回调方法来处理空闲事件。常用的回调方法包括:

  • channelIdle(ChannelHandlerContext ctx, IdleStateEvent stateEvent):当发生空闲事件时调用该方法,可以在该方法中执行相应的逻辑操作。

通常,通过在管道中配置IdleStateHandler来启用空闲事件的检测和处理。

IdleStateHandler是Netty提供的一个特殊的ChannelHandler,用于检测并处理读空闲和写空闲事件。例如,可以在初始化管道时添加以下代码:

pipeline.addLast(new IdleStateHandler(0, 0, idleTime)); // 设置读写空闲时间
pipeline.addLast(new MyIdleHandler()); // 自定义的空闲事件处理器

在自定义的空闲事件处理器中,可以根据读空闲或写空闲事件执行相应的操作。例如,发送心跳消息、关闭连接等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439032.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

短视频矩阵项目年后还能做吗/技术源头搭建分析

短视频矩阵项目年后还能做吗/技术源头搭建分析&#xff1a; 问&#xff1a;矩阵系统年后还可以迭代更新开发继续做吗&#xff1f; 答&#xff1a;可以的&#xff0c;企业依旧有需求 问&#xff1a;如何考察技术团队&#xff1f; 答&#xff1a;以下三个方面 一、是否是抖音…

2024 新年HTML5+Canvas制作3D烟花特效(附源码)

个人名片&#xff1a; &#x1f43c;作者简介&#xff1a;一名大三在校生&#xff0c;喜欢AI编程&#x1f38b; &#x1f43b;‍❄️个人主页&#x1f947;&#xff1a;落798. &#x1f43c;个人WeChat&#xff1a;hmmwx53 &#x1f54a;️系列专栏&#xff1a;&#x1f5bc;️…

时隔3年 | 微软 | Windows Server 2025 重磅发布

最新功能 以下是微软产品团队正在努力的方向&#xff1a; Windows Server 2025 为所有人提供的热补丁下一代 AD 活动目录和 SMB数据与存储Hyper-V 和人工智能还有更多… Ignite 发布视频 Windows Server 2025 Ignite Video 介绍 Windows Server 2022 正式发布日期是2021年…

【计算机视觉(CV)技术的优势和挑战】

背景&#xff1a; 在当今数字时代&#xff0c;博客成为了分享知识、展示个人专业能力和吸引读者的重要工具。然而&#xff0c;随着越来越多的博客涌现&#xff0c;如何优化博客的内容和用户体验成为了一个关键的问题。计算机视觉&#xff08;CV&#xff09;技术是一种利用计算机…

对于已交付(客户流失预警)模型的模型可解释LIME

目录 介绍&#xff1a; 数据&#xff1a; 数据处理&#xff1a; 随机森林建模&#xff1a; LIME 例一&#xff1a; 例二&#xff1a; 介绍&#xff1a; LIME (Local Interpretable Model-agnostic Explanations) 是一种解释机器学习模型的方法。它通过生成一个可解…

【AI_Design】Midjourney学习笔记

目录 后缀解析Promot合格使用prompt关键词描述 关键词化合作用关键词网站推荐 联合Chatgpt 后缀解析 –ar&#xff1a;宽高比设置–c&#xff1a;多样性设置&#xff08;数值0-100&#xff0c;默认值0&#xff09;–s&#xff1a;风格化设置&#xff08;数值0-1000&#xff0c…

Java 与 JavaScript的区别

Java 与 JavaScript的区别 Java 与 JavaScript&#xff1a;概述Java的特点JavaScript 的起源JavaScript 的特点Java 与 JavaScript&#xff0c;哪个更好&#xff1f;JavaScript 与 Java 相似吗&#xff1f;Java 与 JavaScript 的区别JavaScript 在服务器端的运行方式是怎样的&a…

《HTML 简易速速上手小册》第10章:HTML 的维护与优化(2024 最新版)

文章目录 10.1 网页性能优化10.1.1 基础知识10.1.2 案例 1&#xff1a;优化网页图像10.1.3 案例 2&#xff1a;使用延迟加载优化性能10.1.4 案例 3&#xff1a;优化 CSS 和 JavaScript 的加载 10.2 SEO 最佳实践10.2.1 基础知识10.2.2 案例 1&#xff1a;创建一个 SEO 友好的博…

Linux系统Shell脚本-----------正则表达式 、grep、 sed

一、正则表达式 1.前言 正则表达式(regular expression)描述了一种字符串匹配的模式&#xff08;pattern&#xff09;&#xff0c;可以用来检查一个串是否含有某种子串、将匹配的子串替换或者从某个串中取出符合某个条件的子串等。在Linux中也就是代表我们定义的模式模板&…

常用芯片学习——AMS1117芯片

AMS1117 1A 低压差线性稳压器 使用说明 AMS1117 是一款低压差线性稳压电路&#xff0c;该电路输出电流能力为1A。该系列电路包含固定输出电压版本和可调输出电压版本&#xff0c;其输出电压精度为士1.5%。为了保证芯片和电源系统的稳定性&#xff0c;XBLWAMS1117 内置热保护和…

基于Pytorch的DDP训练Mnist数据集

在前几期的博文中我们讲了pytorch的DDP&#xff0c;但是当时的demo是自制的虚拟数据集&#xff08;Pytorch分布式训练&#xff1a;DDP&#xff09;&#xff0c;这期文章我们使用Mnist数据集做测试&#xff0c;测试并完善代码。 快速开始 1. 我们修改一下main函数&#xff0c;在…

MySQL之索引使用原则详解(验证索引效率,SQL提示等)

索引使用 验证索引效率 在未建立索引之前&#xff0c;执行如下SQL语句&#xff0c;查看SQL的耗时 select * from tb_user where id 1; 针对字段创建索引 create index idx_sku_sn on tb_sku(sn) ; 针对于用户量大的表中&#xff0c;添加索引要比没有添加索引的字段查询…