文章目录
- Pre
- 设计
- Code
- Bus接口
- 自定义注解 @Subscribe
- 同步EventBus
- 异步EventBus
- Subscriber注册表Registry
- Event广播Dispatcher
- 测试
- 简单的Subscriber
- 同步Event Bus
- 异步Event Bus
Pre
我们在日常的工作中,都会使用到MQ这种组件, 某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息。
如图所示【消息中间件的消息订阅与发布】
消息中间件的核心作用是提供系统之间的异步消息处理机制。它可以在一个系统完成操作后,通过提交消息到消息中间件,触发其他依赖系统的后续处理,而不需要等待后续处理完全结束。
使用消息中间件的好处有:
- 提高系统处理效率,系统之间可以异步并行处理
- 降低系统耦合,通过消息进行解耦
- 提高系统故障隔离能力,一个系统故障不会影响其他系统
今天我们来实现一个Java进程内部的消息中间件Event Bus,它可以用于进程内不同组件之间的异步消息通信。
设计
- Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event
- register方法用来注册Event接收者(Subscriber)接受响应事件
- EventBus采用同步的方式推送Event
- AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event
- Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法用注解@Subscribe来标识。
- Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber
Code
Bus接口
package com.artisan.busevent.intf;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world* @desc: Bus接口定义了EventBus的所有使用方法*/
public interface Bus {/*** 将某个对象注册到Bus上,从此之后该类就成为Subscriber了*/void register(Object subscriber);/*** 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息*/void unregister(Object subscriber);/*** 提交Event到默认的topic*/void post(Object event);/*** 提交Event到指定的topic*/void post(Object Event, String topic);/*** 关闭该bus*/void close();/*** 返回Bus的名称标识*/String getBusName();
}
Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。
-
register(Object subscriber)
:将某个对象实例注册给Event Bus -
unregister(Object subscriber
):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除 -
post(Object event)
:提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic -
post(Object Event,String topic)
:提交Event的同时指定了topic -
close()
:销毁该Event Bus -
getBusName()
:返回该Event Bus的名称
自定义注解 @Subscribe
注册对象给Event Bus的时候需要指定接收消息时的回调方法,采用注解的方式进行Event回调
package com.artisan.busevent.annotations;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {String topic() default "default-topic";
}
@Subscribe
要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic
)
同步EventBus
同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式.
package com.artisan.busevent.impl;import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventExceptionHandler;import java.util.concurrent.Executor;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EventBus implements Bus {/*** 用于维护Subscriber的注册表*/private final Registry registry = new Registry();/*** Event Bus的名字*/private String busName;/*** 默认的Event Bus的名字*/private final static String DEFAULT_BUS_NAME = "default";/*** 默认的topic的名字*/private final static String DEFAULT_TOPIC = "default-topic";/*** 用于分发广播消息到各个Subscriber的类*/private final Dispatcher dispatcher;public EventBus() {this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** @param busName*/public EventBus(String busName) {this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {this.busName = busName;this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);}public EventBus(EventExceptionHandler exceptionHandler) {this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** 将注册Subscriber的动作直接委托给Registry** @param subscriber*/@Overridepublic void register(Object subscriber) {this.registry.bind(subscriber);}/*** 解除注册同样委托给Registry** @param subscriber*/@Overridepublic void unregister(Object subscriber) {this.registry.unbind(subscriber);}/*** 提交Event到默认的topic** @param event*/@Overridepublic void post(Object event) {this.post(event, DEFAULT_TOPIC);}/*** 提交Event到指定的topic,具体的动作是由Dispatcher来完成的** @param event* @param topic*/@Overridepublic void post(Object event, String topic) {this.dispatcher.dispatch(this, registry, event, topic);}/*** 关闭销毁Bus*/@Overridepublic void close() {this.dispatcher.close();}/*** 返回Bus的名称** @return*/@Overridepublic String getBusName() {return this.busName;}
}
-
EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
-
registry和unregister都是通过Subscriber注册表来完成的。
-
Event的提交则是由Dispatcher来完成的。
-
Executor是使用JDK中的Executor接口,自定义的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
异步EventBus
如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus 。
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可
package com.artisan.busevent.impl;import com.artisan.busevent.intf.EventExceptionHandler;import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0 * @mark: show me the code , change the world*/
public class AsyncEventBus extends EventBus {AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {super(busName, exceptionHandler, executor);}public AsyncEventBus(String busName, ThreadPoolExecutor executor) {this(busName, null, executor);}public AsyncEventBus(ThreadPoolExecutor executor) {this("default-async", null, executor);}public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {this("default-async", exceptionHandler, executor);}
}
重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor 。
Subscriber注册表Registry
注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口
package com.artisan.busevent.impl;import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.relations.Subscriber;import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
class Registry {/*** 存储Subscriber集合和topic之间关系的map*/private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();public void bind(Object subscriber) {//获取Subscriber Object的方法集合然后进行绑定List<Method> subscribeMethods = getSubscribeMethods(subscriber);subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));}public void unbind(Object subscriber) {//unbind为了提高速度,只对Subscriber进行失效操作subscriberContainer.forEach((key, queue) ->queue.forEach(s ->{if (s.getSubscribeObject() == subscriber) {s.setDisable(true);}}));}public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {return subscriberContainer.get(topic);}private void tierSubscriber(Object subscriber, Method method) {final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);String topic = subscribe.topic();//当某topic没有Subscriber Queue的时候创建一个subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());//创建一个Subscriber并且加入Subscriber列表中subscriberContainer.get(topic).add(new Subscriber(subscriber, method));}private List<Method> getSubscribeMethods(Object subscriber) {final List<Method> methods = new ArrayList<>();Class<?> temp = subscriber.getClass();//不断获取当前类和父类的所有@Subscribe方法while (temp != null) {//获取所有的方法Method[] declaredMethods = temp.getDeclaredMethods();//只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法Arrays.stream(declaredMethods).filter(m -> m.isAnnotationPresent(Subscribe.class)&& m.getParameterCount() == 1&& m.getModifiers() == Modifier.PUBLIC).forEach(methods::add);temp = temp.getSuperclass();}return methods;}
}
由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,比如
/**
*非常普通的对象
*/
public class SimpleObject
{/***subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数*/@Subscribe(topic = "artisan-topic")public void test2(Integer x){}@Subscribe(topic = "test-topic")public void test3(Integer x){}
}
SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event
Event广播Dispatcher
Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法.
package com.artisan.busevent.impl;import com.artisan.busevent.relations.Subscriber;
import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventContext;
import com.artisan.busevent.intf.EventExceptionHandler;import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class Dispatcher {private final Executor executorService;private final EventExceptionHandler exceptionHandler;public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {this.executorService = executorService;this.exceptionHandler = exceptionHandler;}public void dispatch(Bus bus, Registry registry, Object event, String topic) {//根据topic获取所有的Subscriber列表ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);if (null == subscribers) {if (exceptionHandler != null) {exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),new BaseEventContext(bus.getBusName(), null, event));}return;}//遍历所有的方法,并且通过反射的方式进行方法调用subscribers.stream().filter(subscriber -> !subscriber.isDisable()).filter(subscriber ->{Method subscribeMethod = subscriber.getSubscribeMethod();Class<?> aClass = subscribeMethod.getParameterTypes()[0];return (aClass.isAssignableFrom(event.getClass()));}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));}private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {Method subscribeMethod = subscriber.getSubscribeMethod();Object subscribeObject = subscriber.getSubscribeObject();executorService.execute(() ->{try {subscribeMethod.invoke(subscribeObject, event);} catch (Exception e) {if (null != exceptionHandler) {exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));}}});}public void close() {if (executorService instanceof ExecutorService) {((ExecutorService) executorService).shutdown();}}static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {return new Dispatcher(executor, exceptionHandler);}static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);}static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);}/*** 顺序执行的ExecutorService*/private static class SeqExecutorService implements Executor {private final static SeqExecutorService INSTANCE = new SeqExecutorService();@Overridepublic void execute(Runnable command) {command.run();}}/*** 每个线程负责一次消息推送*/private static class PreThreadExecutorService implements Executor {private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();@Overridepublic void execute(Runnable command) {new Thread(command).start();}}/*** 默认的EventContext实现*/private static class BaseEventContext implements EventContext {private final String eventBusName;private final Subscriber subscriber;private final Object event;private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {this.eventBusName = eventBusName;this.subscriber = subscriber;this.event = event;}@Overridepublic String getSource() {return this.eventBusName;}@Overridepublic Object getSubscriber() {return subscriber != null ? subscriber.getSubscribeObject() : null;}@Overridepublic Method getSubscribe() {return subscriber != null ? subscriber.getSubscribeMethod() : null;}@Overridepublic Object getEvent() {return this.event;}}
}
在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent接口。
测试
简单的Subscriber
package com.artisan.busevent.domains;import com.artisan.busevent.annotations.Subscribe;
import lombok.extern.slf4j.Slf4j;/*** @author 小工匠* @version 1.0 * @mark: show me the code , change the world*/
@Slf4j
public class SubscriberA {@Subscribepublic void methodA1(String message) {log.info("SubscriberA-->methodA1-->收到Message(字符串)--> {}", message);}@Subscribe(topic = "test")public void methodA2(String message) {log.info("SubscriberA-->methodA2-->收到Message-(对象)-> {}", message);}
}
package com.artisan.busevent.domains;import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.extern.slf4j.Slf4j;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
@Slf4j
public class SubscriberB {@Subscribepublic void methodB1(String message) {log.info("SubscriberB-->methodB1-->收到Message(字符串)--> {}", message);}@Subscribe(topic = "test")public void methodB2(Artisan artisan) {log.info("SubscriberB-->methodB-->收到Message(对象)--> {} ", artisan);}
}
package com.artisan.busevent.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.List;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/@Data
@NoArgsConstructor
@AllArgsConstructor()
public class Artisan {private String name;private Integer age;private List<String> hobbies;
}