死磕Nacos系列:Nacos事件发布订阅模型

前言

在Nacos源码中,你是否也经常看到NotifyCenter.publishEvent这样的代码块?

这个事件发布出去后,有哪些类接收到通知并进行了逻辑处理呢?

这里面的实现逻辑是什么呢?

如果你不太清楚,那我们一起来梳理下。Let’s go!

NotifyCenter

如果我们不靠其它资料,从代码层面去解读的话,还是得先从这个类入手,因为都是调用其静态方法完成通知的。

这个类的静态代码块里面有这样的逻辑:

static {// 从系统环境中取"nacos.core.notify.ring-buffer-size",如果没有设置,则取默认值16384String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);// 从系统环境中取"nacos.core.notify.share-buffer-size",如果没有设置,则取默认值1024String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);// 通过SPI的方式加载EventPublisher的实现类final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}// 创建一个默认的EventPublisherFactory,用来产生EventPublisherDEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// 创建一个默认共享的EventPublisherINSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}// 添加一个ShutdownHookThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}

从这里我们可以知道,有EventPublisherFactoryEventPublisher的概念,在EventPublisher中,还专门创建了一个DefaultSharePublisher,还知道了一个事件叫做SlowEvent

那我们就看一下这几个模型:

模型

EventPublisherFactory

public interface EventPublisherFactory extends BiFunction<Class<? extends Event>, Integer, EventPublisher> {/*** Build an new {@link EventPublisher}.** @param eventType    eventType for {@link EventPublisher}* @param maxQueueSize max queue size for {@link EventPublisher}* @return new {@link EventPublisher}*/@OverrideEventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize);
}

可以看到,接口中就一个方法,就是通过事件类型和最大队列大小来得到一个EventPublisher

image-20231125165128331

其中有NamingEventPublisherFactoryTraceEventPublisherFactory两个实现类。

EventPublisher

public interface EventPublisher extends Closeable {// 初始化EventPublishervoid init(Class<? extends Event> type, int bufferSize);// 获取当前事件的数量long currentEventSize();// 添加一个订阅者void addSubscriber(Subscriber subscriber);// 删除一个订阅者void removeSubscriber(Subscriber subscriber);// 发布事件boolean publish(Event event);// 通知订阅者void notifySubscriber(Subscriber subscriber, Event event);}

其类图如下:

image-20231125165544405

这里SharedEventPublisher看起来是做了更多的扩展,来看看呢:

public interface ShardedEventPublisher extends EventPublisher {// 给指定的一个事件添加订阅者void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);// 给指定的一个事件删除订阅者void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}

这里新加入了个Subscriber,那我们来看看:

Subscriber

public abstract class Subscriber<T extends Event> {// 接收事件public abstract void onEvent(T event);// 获取当前订阅的事件类型public abstract Class<? extends Event> subscribeType();// 获取异步处理器public Executor executor() {return null;}// 忽略失效的事件public boolean ignoreExpireEvent() {return false;}// 获取事件是否匹配作用域public boolean scopeMatches(T event) {return true;}
}

有个特殊的SmartSubscriber,它是Subscriber的子类,是一个抽象类:

public abstract class SmartSubscriber extends Subscriber {// 返回订阅的事件列表public abstract List<Class<? extends Event>> subscribeTypes();@Overridepublic final Class<? extends Event> subscribeType() {return null;}// 默认不忽略过期的事件@Overridepublic final boolean ignoreExpireEvent() {return false;}
}

Event

public abstract class Event implements Serializable {private static final long serialVersionUID = -3731383194964997493L;private static final AtomicLong SEQUENCE = new AtomicLong(0);private final long sequence = SEQUENCE.getAndIncrement();// 获取事件序号public long sequence() {return sequence;}// 获取事件作用域public String scope() {return null;}// 判断是否是一个插件的事件public boolean isPluginEvent() {return false;}
}

SlowEvent中,仅仅是将sequence统一默认为0。

public abstract class SlowEvent extends Event {@Overridepublic long sequence() {return 0;}
}

综上,我们通过接口关系就可以大概得出一个结论:

EventPublisher可通过EventPublisherFactory创建得来,EventPublisher可以给Event添加Subscriber,并发布事件让对应的Subscriber执行。这不妥妥的观察者模式嘛!

我们来画张图:

image-20231125171510677

了解了NotifyCenter大概的模型之后,我们来看看内部的实现。

实现方法

registerToPublisher

给时间注册一个EventPublisher。

public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);
}

诶,如果事件是SlowEvent,就会统一返回一个sharePublisher,这个就是静态代码块中初始化的DefaultSharePublisher

好,(敲黑板),所有SlowEvent的事件共用一个DefaultSharePublisher

接着给NotifyCenter上锁,如果publisherMap中没有当前事件的EventPublisher,那就用EventPublisherFactory创建一个EventPublisher,并存入publisherMap中,再返回。

registerSubscriber

给事件注册一个订阅者。

public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// 如果订阅者是一个SmartSubscriber,可以订阅多个事件if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 如果事件是SlowEvent,则添加到sharePublisher中if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {// 其他事件则添加到对应的EventPublisher中addSubscriber(consumer, subscribeType, factory);}}return;}// 其他类型的Subscriberfinal Class<? extends Event> subscribeType = consumer.subscribeType();if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 如果事件是SlowEvent,则添加到sharePublisher中INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);return;}// 其他事件则添加到对应的EventPublisher中addSubscriber(consumer, subscribeType, factory);
}

publishEvent

发送事件。

private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {\// 如果事件是SlowEvent,则用sharePublisher发送if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);// 根据事件找到对应的EventPublisher进行发送                                                  EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;
}

总结

Nacos的事件发布是基于观察者模式进行设计的。

如果一个处理器需要接收到某个特定事件的通知,那么就要先通过NotifyCenter来订阅自己感兴趣的Event,当显示调用NotifyCenter.publishEvent时,会通过EventPublisher进行通知到订阅者。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/219211.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2024年天津天狮学院食品质量与安全专业《普通化学》考试大纲

2024年天津天狮学院食品质量与安全专业高职升本入学考试《普通化学》考试大纲 一、考试性质 《普通化学》专业课程考试是天津天狮学院食品质量与安全专业高职升本入学考试 的必考科目之一&#xff0c;其性质是考核学生是否达到了升入本科继续学习的要求而进行的选拔性考试。《…

大数据面试大厂真题【附答案详细解析】

1.Java基础篇&#xff08;阿里、蚂蚁、字节、携程、快手、杭州银行等&#xff09; 问题&#xff1a;HashMap的底层实现原理 答案&#xff1a; 在jdk1.8之前&#xff0c;hashmap由 数组-链表数据结构组成&#xff0c;在jdk1.8之后hashmap由 数组-链表-红黑树数据结构组成&…

再见 Pandas,再见算法

大家好,《再见pandas》 系列已有200多位朋友加入学习了,这段时间亲眼见证了很多朋友的飞跃进步,从无到有,从一个问问题的小白到开始慢慢回答别人的问题,在讨论和练习中不断成长。虽说pandas已经很普及了,但普及内容的深度却远远不够。 下面这套原创图文是我和几位小伙伴…

cjson库打包数据实现方法

使用 cJson 库&#xff0c;在C语言环境下&#xff0c;打包一个cJson字符串&#xff1a; int CreateArryJsonString(void) {cJSON *cJsonArr cJSON_CreateArray();cJSON *sJsonObj1 cJSON_CreateObject();cJSON_AddStringToObject(sJsonObj1, "test1", "test1…

WIFI模块(esp-01s)获取网络时间与天气信息

目录 一、硬件连接 二、获取网络时间 1、AT指令集 2、具体操作 三、获取天气信息 1、心知天气注册 2、AT指令集 3、具体操作 4、json格式检查 一、硬件连接 WiFi模块的RX连接TTL模块的TX&#xff0c; WiFi模块的TX连接TTL模块的RX&#xff0c;电源与地接对。 插入电脑…

SQL Server秘籍:数据分隔解密,数据库处理新境界!

点击上方蓝字关注我 在数据数据过程中经常会遇到数据按照一定字符进行拆分&#xff0c;而在不同版本的SQL SERVER数据库中由于包含的函数不同&#xff0c;处理的方式也不一样。本文将列举2个版本的数据库中不同的处理方法。 1. 使用 XML 方法 在SQL SERVER 2016版本之前&#x…

11 月 25 日 ROS 学习笔记——3D 建模与仿真

文章目录 前言一、在 ROS 中自定义机器人的3D模型1. 在 rviz 里查看3D模型2. xacro 二、Gazebo1. urdf 集成 gazebo2. 综合应用1). 运动控制及里程计2). 雷达仿真3). 摄像头信息仿真4). kinect 深度相机仿真5). 点云 前言 本文为11 月 25 日 ROS 学习笔记——3D 建模与仿真&am…

Self Distillation 自蒸馏论文解读

paper&#xff1a;Be Your Own Teacher: Improve the Performance of Convolutional Neural Networks via Self Distillation official implementation&#xff1a; https://github.com/luanyunteng/pytorch-be-your-own-teacher 前言 知识蒸馏作为一种流行的压缩方法&#…

四、IDEA创建项目时,Maven Archetype模板工程说明

什么是Maven Archetype Archetype是一个Maven项目的模板工具包&#xff0c;它定义了一类项目的基本架构。Archetype为开发人员提供了创建Maven项目的模板&#xff0c;同时它也可以根据已有的Maven项目生成参数化的模板。 官方文档&#xff1a;https://maven.apache.org/archet…

pytorch导出rot90算子至onnx

如何导出rot90算子至onnx 1 背景描述2 等价替换2.1 rot90替换(NCHW)2.2 rot180替换(NCHW)2.3 rot270替换(NCHW) 3 rot导出ONNX 1 背景描述 在部署模型时&#xff0c;如果某些模型中或者前后处理中含有rot90算子&#xff0c;但又希望一起和模型导出onnx时&#xff0c;可能会遇到…

【二叉树】oj题

在处理oj题之前我们需要先处理一下之前遗留的问题 在二叉树中寻找为x的节点 BTNode* BinaryTreeFind(BTNode* root, int x) {if (root NULL)return NULL;if (root->data x)return root;BTNode* ret1 BinaryTreeFind(root->left, x);BTNode* ret2 BinaryTreeFind(ro…

【云原生】什么是 Kubernetes ?

什么是 Kubernetes &#xff1f; Kubernetes 是一个开源容器编排平台&#xff0c;管理着一系列的 主机 或者 服务器&#xff0c;它们被称作是 节点&#xff08;Node&#xff09;。 每一个节点运行了若干个相互独立的 Pod。 Pod 是 Kubernetes 中可以部署的 最小执行单元&#x…