前言
在Nacos源码中,你是否也经常看到NotifyCenter.publishEvent
这样的代码块?
这个事件发布出去后,有哪些类接收到通知并进行了逻辑处理呢?
这里面的实现逻辑是什么呢?
如果你不太清楚,那我们一起来梳理下。Let’s go!
NotifyCenter
如果我们不靠其它资料,从代码层面去解读的话,还是得先从这个类入手,因为都是调用其静态方法完成通知的。
这个类的静态代码块里面有这样的逻辑:
static {// 从系统环境中取"nacos.core.notify.ring-buffer-size",如果没有设置,则取默认值16384String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);// 从系统环境中取"nacos.core.notify.share-buffer-size",如果没有设置,则取默认值1024String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);// 通过SPI的方式加载EventPublisher的实现类final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);Iterator<EventPublisher> iterator = publishers.iterator();if (iterator.hasNext()) {clazz = iterator.next().getClass();} else {clazz = DefaultPublisher.class;}// 创建一个默认的EventPublisherFactory,用来产生EventPublisherDEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {try {EventPublisher publisher = clazz.newInstance();publisher.init(cls, buffer);return publisher;} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);throw new NacosRuntimeException(SERVER_ERROR, ex);}};try {// 创建一个默认共享的EventPublisherINSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);} catch (Throwable ex) {LOGGER.error("Service class newInstance has error : ", ex);}// 添加一个ShutdownHookThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
从这里我们可以知道,有EventPublisherFactory
和EventPublisher
的概念,在EventPublisher
中,还专门创建了一个DefaultSharePublisher
,还知道了一个事件叫做SlowEvent
。
那我们就看一下这几个模型:
模型
EventPublisherFactory
public interface EventPublisherFactory extends BiFunction<Class<? extends Event>, Integer, EventPublisher> {/*** Build an new {@link EventPublisher}.** @param eventType eventType for {@link EventPublisher}* @param maxQueueSize max queue size for {@link EventPublisher}* @return new {@link EventPublisher}*/@OverrideEventPublisher apply(Class<? extends Event> eventType, Integer maxQueueSize);
}
可以看到,接口中就一个方法,就是通过事件类型和最大队列大小来得到一个EventPublisher
。
其中有NamingEventPublisherFactory
和TraceEventPublisherFactory
两个实现类。
EventPublisher
public interface EventPublisher extends Closeable {// 初始化EventPublishervoid init(Class<? extends Event> type, int bufferSize);// 获取当前事件的数量long currentEventSize();// 添加一个订阅者void addSubscriber(Subscriber subscriber);// 删除一个订阅者void removeSubscriber(Subscriber subscriber);// 发布事件boolean publish(Event event);// 通知订阅者void notifySubscriber(Subscriber subscriber, Event event);}
其类图如下:
这里SharedEventPublisher
看起来是做了更多的扩展,来看看呢:
public interface ShardedEventPublisher extends EventPublisher {// 给指定的一个事件添加订阅者void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);// 给指定的一个事件删除订阅者void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}
这里新加入了个Subscriber,那我们来看看:
Subscriber
public abstract class Subscriber<T extends Event> {// 接收事件public abstract void onEvent(T event);// 获取当前订阅的事件类型public abstract Class<? extends Event> subscribeType();// 获取异步处理器public Executor executor() {return null;}// 忽略失效的事件public boolean ignoreExpireEvent() {return false;}// 获取事件是否匹配作用域public boolean scopeMatches(T event) {return true;}
}
有个特殊的SmartSubscriber
,它是Subscriber
的子类,是一个抽象类:
public abstract class SmartSubscriber extends Subscriber {// 返回订阅的事件列表public abstract List<Class<? extends Event>> subscribeTypes();@Overridepublic final Class<? extends Event> subscribeType() {return null;}// 默认不忽略过期的事件@Overridepublic final boolean ignoreExpireEvent() {return false;}
}
Event
public abstract class Event implements Serializable {private static final long serialVersionUID = -3731383194964997493L;private static final AtomicLong SEQUENCE = new AtomicLong(0);private final long sequence = SEQUENCE.getAndIncrement();// 获取事件序号public long sequence() {return sequence;}// 获取事件作用域public String scope() {return null;}// 判断是否是一个插件的事件public boolean isPluginEvent() {return false;}
}
在SlowEvent
中,仅仅是将sequence统一默认为0。
public abstract class SlowEvent extends Event {@Overridepublic long sequence() {return 0;}
}
综上,我们通过接口关系就可以大概得出一个结论:
EventPublisher
可通过EventPublisherFactory
创建得来,EventPublisher
可以给Event
添加Subscriber
,并发布事件让对应的Subscriber
执行。这不妥妥的观察者模式嘛!
我们来画张图:
了解了NotifyCenter
大概的模型之后,我们来看看内部的实现。
实现方法
registerToPublisher
给时间注册一个EventPublisher。
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,final EventPublisherFactory factory, final int queueMaxSize) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher;}final String topic = ClassUtils.getCanonicalName(eventType);synchronized (NotifyCenter.class) {// MapUtils.computeIfAbsent is a unsafe method.MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);}return INSTANCE.publisherMap.get(topic);
}
诶,如果事件是SlowEvent
,就会统一返回一个sharePublisher
,这个就是静态代码块中初始化的DefaultSharePublisher
。
好,(敲黑板),所有SlowEvent
的事件共用一个DefaultSharePublisher
。
接着给NotifyCenter
上锁,如果publisherMap
中没有当前事件的EventPublisher
,那就用EventPublisherFactory
创建一个EventPublisher
,并存入publisherMap
中,再返回。
registerSubscriber
给事件注册一个订阅者。
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {// 如果订阅者是一个SmartSubscriber,可以订阅多个事件if (consumer instanceof SmartSubscriber) {for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {// 如果事件是SlowEvent,则添加到sharePublisher中if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);} else {// 其他事件则添加到对应的EventPublisher中addSubscriber(consumer, subscribeType, factory);}}return;}// 其他类型的Subscriberfinal Class<? extends Event> subscribeType = consumer.subscribeType();if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {// 如果事件是SlowEvent,则添加到sharePublisher中INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);return;}// 其他事件则添加到对应的EventPublisher中addSubscriber(consumer, subscribeType, factory);
}
publishEvent
发送事件。
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {\// 如果事件是SlowEvent,则用sharePublisher发送if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);// 根据事件找到对应的EventPublisher进行发送 EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {return publisher.publish(event);}if (event.isPluginEvent()) {return true;}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;
}
总结
Nacos的事件发布是基于观察者模式进行设计的。
如果一个处理器需要接收到某个特定事件的通知,那么就要先通过NotifyCenter
来订阅自己感兴趣的Event
,当显示调用NotifyCenter.publishEvent
时,会通过EventPublisher
进行通知到订阅者。