并发编程 - Event Bus 设计模式

文章目录

  • Pre
  • 设计
  • Code
    • Bus接口
    • 自定义注解 @Subscribe
    • 同步EventBus
    • 异步EventBus
    • Subscriber注册表Registry
    • Event广播Dispatcher
  • 测试
    • 简单的Subscriber
    • 同步Event Bus
    • 异步Event Bus

在这里插入图片描述

Pre

我们在日常的工作中,都会使用到MQ这种组件, 某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息。

在这里插入图片描述

如图所示【消息中间件的消息订阅与发布】

消息中间件的核心作用是提供系统之间的异步消息处理机制。它可以在一个系统完成操作后,通过提交消息到消息中间件,触发其他依赖系统的后续处理,而不需要等待后续处理完全结束。

使用消息中间件的好处有:

  • 提高系统处理效率,系统之间可以异步并行处理
  • 降低系统耦合,通过消息进行解耦
  • 提高系统故障隔离能力,一个系统故障不会影响其他系统

今天我们来实现一个Java进程内部的消息中间件Event Bus,它可以用于进程内不同组件之间的异步消息通信。


设计

  • Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event
  • register方法用来注册Event接收者(Subscriber)接受响应事件
  • EventBus采用同步的方式推送Event
  • AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event
  • Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法用注解@Subscribe来标识。
  • Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber

在这里插入图片描述


Code

在这里插入图片描述

Bus接口

package com.artisan.busevent.intf;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world* @desc: Bus接口定义了EventBus的所有使用方法*/
public interface Bus {/*** 将某个对象注册到Bus上,从此之后该类就成为Subscriber了*/void register(Object subscriber);/*** 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息*/void unregister(Object subscriber);/*** 提交Event到默认的topic*/void post(Object event);/*** 提交Event到指定的topic*/void post(Object Event, String topic);/*** 关闭该bus*/void close();/*** 返回Bus的名称标识*/String getBusName();
}

Bus接口中定义了注册topic的方法和Event发送的方法,具体如下。

  • register(Object subscriber):将某个对象实例注册给Event Bus

  • unregister(Object subscriber):取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除

  • post(Object event):提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic

  • post(Object Event,String topic):提交Event的同时指定了topic

  • close():销毁该Event Bus

  • getBusName():返回该Event Bus的名称


自定义注解 @Subscribe

注册对象给Event Bus的时候需要指定接收消息时的回调方法,采用注解的方式进行Event回调

package com.artisan.busevent.annotations;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {String topic() default "default-topic";
}

@Subscribe要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic(default-topic


同步EventBus

同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式.

package com.artisan.busevent.impl;import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventExceptionHandler;import java.util.concurrent.Executor;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class EventBus implements Bus {/*** 用于维护Subscriber的注册表*/private final Registry registry = new Registry();/*** Event Bus的名字*/private String busName;/*** 默认的Event Bus的名字*/private final static String DEFAULT_BUS_NAME = "default";/*** 默认的topic的名字*/private final static String DEFAULT_TOPIC = "default-topic";/*** 用于分发广播消息到各个Subscriber的类*/private final Dispatcher dispatcher;public EventBus() {this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** @param busName*/public EventBus(String busName) {this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);}EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor) {this.busName = busName;this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);}public EventBus(EventExceptionHandler exceptionHandler) {this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);}/*** 将注册Subscriber的动作直接委托给Registry** @param subscriber*/@Overridepublic void register(Object subscriber) {this.registry.bind(subscriber);}/*** 解除注册同样委托给Registry** @param subscriber*/@Overridepublic void unregister(Object subscriber) {this.registry.unbind(subscriber);}/*** 提交Event到默认的topic** @param event*/@Overridepublic void post(Object event) {this.post(event, DEFAULT_TOPIC);}/*** 提交Event到指定的topic,具体的动作是由Dispatcher来完成的** @param event* @param topic*/@Overridepublic void post(Object event, String topic) {this.dispatcher.dispatch(this, registry, event, topic);}/*** 关闭销毁Bus*/@Overridepublic void close() {this.dispatcher.close();}/*** 返回Bus的名称** @return*/@Overridepublic String getBusName() {return this.busName;}
}
  • EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。

  • registry和unregister都是通过Subscriber注册表来完成的。

  • Event的提交则是由Dispatcher来完成的。

  • Executor是使用JDK中的Executor接口,自定义的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。


异步EventBus

如果想要使用异步的方式进行推送,可使用EventBus的子类AsyncEventBus 。

异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可

package com.artisan.busevent.impl;import com.artisan.busevent.intf.EventExceptionHandler;import java.util.concurrent.ThreadPoolExecutor;/*** @author 小工匠* @version 1.0 * @mark: show me the code , change the world*/
public class AsyncEventBus extends EventBus {AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {super(busName, exceptionHandler, executor);}public AsyncEventBus(String busName, ThreadPoolExecutor executor) {this(busName, null, executor);}public AsyncEventBus(ThreadPoolExecutor executor) {this("default-async", null, executor);}public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor) {this("default-async", exceptionHandler, executor);}
}

重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor 。


Subscriber注册表Registry

注册表维护了topic和subscriber之间的关系,当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口

package com.artisan.busevent.impl;import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.relations.Subscriber;import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
class Registry {/*** 存储Subscriber集合和topic之间关系的map*/private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();public void bind(Object subscriber) {//获取Subscriber Object的方法集合然后进行绑定List<Method> subscribeMethods = getSubscribeMethods(subscriber);subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));}public void unbind(Object subscriber) {//unbind为了提高速度,只对Subscriber进行失效操作subscriberContainer.forEach((key, queue) ->queue.forEach(s ->{if (s.getSubscribeObject() == subscriber) {s.setDisable(true);}}));}public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic) {return subscriberContainer.get(topic);}private void tierSubscriber(Object subscriber, Method method) {final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);String topic = subscribe.topic();//当某topic没有Subscriber Queue的时候创建一个subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());//创建一个Subscriber并且加入Subscriber列表中subscriberContainer.get(topic).add(new Subscriber(subscriber, method));}private List<Method> getSubscribeMethods(Object subscriber) {final List<Method> methods = new ArrayList<>();Class<?> temp = subscriber.getClass();//不断获取当前类和父类的所有@Subscribe方法while (temp != null) {//获取所有的方法Method[] declaredMethods = temp.getDeclaredMethods();//只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法Arrays.stream(declaredMethods).filter(m -> m.isAnnotationPresent(Subscribe.class)&& m.getParameterCount() == 1&& m.getModifiers() == Modifier.PUBLIC).forEach(methods::add);temp = temp.getSuperclass();}return methods;}
}

由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类,所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic),同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息,比如

/**
*非常普通的对象
*/
public class SimpleObject
{/***subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数*/@Subscribe(topic = "artisan-topic")public void test2(Integer x){}@Subscribe(topic = "test-topic")public void test3(Integer x){}
}

SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event


Event广播Dispatcher

Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法.

package com.artisan.busevent.impl;import com.artisan.busevent.relations.Subscriber;
import com.artisan.busevent.intf.Bus;
import com.artisan.busevent.intf.EventContext;
import com.artisan.busevent.intf.EventExceptionHandler;import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
public class Dispatcher {private final Executor executorService;private final EventExceptionHandler exceptionHandler;public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler) {this.executorService = executorService;this.exceptionHandler = exceptionHandler;}public void dispatch(Bus bus, Registry registry, Object event, String topic) {//根据topic获取所有的Subscriber列表ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);if (null == subscribers) {if (exceptionHandler != null) {exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),new BaseEventContext(bus.getBusName(), null, event));}return;}//遍历所有的方法,并且通过反射的方式进行方法调用subscribers.stream().filter(subscriber -> !subscriber.isDisable()).filter(subscriber ->{Method subscribeMethod = subscriber.getSubscribeMethod();Class<?> aClass = subscribeMethod.getParameterTypes()[0];return (aClass.isAssignableFrom(event.getClass()));}).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));}private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus) {Method subscribeMethod = subscriber.getSubscribeMethod();Object subscribeObject = subscriber.getSubscribeObject();executorService.execute(() ->{try {subscribeMethod.invoke(subscribeObject, event);} catch (Exception e) {if (null != exceptionHandler) {exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));}}});}public void close() {if (executorService instanceof ExecutorService) {((ExecutorService) executorService).shutdown();}}static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor) {return new Dispatcher(executor, exceptionHandler);}static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);}static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler) {return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);}/*** 顺序执行的ExecutorService*/private static class SeqExecutorService implements Executor {private final static SeqExecutorService INSTANCE = new SeqExecutorService();@Overridepublic void execute(Runnable command) {command.run();}}/*** 每个线程负责一次消息推送*/private static class PreThreadExecutorService implements Executor {private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();@Overridepublic void execute(Runnable command) {new Thread(command).start();}}/*** 默认的EventContext实现*/private static class BaseEventContext implements EventContext {private final String eventBusName;private final Subscriber subscriber;private final Object event;private BaseEventContext(String eventBusName, Subscriber subscriber, Object event) {this.eventBusName = eventBusName;this.subscriber = subscriber;this.event = event;}@Overridepublic String getSource() {return this.eventBusName;}@Overridepublic Object getSubscriber() {return subscriber != null ? subscriber.getSubscribeObject() : null;}@Overridepublic Method getSubscribe() {return subscriber != null ? subscriber.getSubscribeMethod() : null;}@Overridepublic Object getEvent() {return this.event;}}
}

在Dispatcher中,除了从Registry中获取对应的Subscriber执行之外,我们还定义了几个静态内部类,其主要是实现了Executor接口和EventContent接口。


测试

简单的Subscriber

package com.artisan.busevent.domains;import com.artisan.busevent.annotations.Subscribe;
import lombok.extern.slf4j.Slf4j;/*** @author 小工匠* @version 1.0 * @mark: show me the code , change the world*/
@Slf4j
public class SubscriberA {@Subscribepublic void methodA1(String message) {log.info("SubscriberA-->methodA1-->收到Message(字符串)--> {}", message);}@Subscribe(topic = "test")public void methodA2(String message) {log.info("SubscriberA-->methodA2-->收到Message-(对象)-> {}", message);}
}
package com.artisan.busevent.domains;import com.artisan.busevent.annotations.Subscribe;
import com.artisan.busevent.entity.Artisan;
import lombok.extern.slf4j.Slf4j;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/
@Slf4j
public class SubscriberB {@Subscribepublic void methodB1(String message) {log.info("SubscriberB-->methodB1-->收到Message(字符串)--> {}", message);}@Subscribe(topic = "test")public void methodB2(Artisan artisan) {log.info("SubscriberB-->methodB-->收到Message(对象)--> {} ", artisan);}
}
package com.artisan.busevent.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.List;/*** @author 小工匠* @version 1.0* @mark: show me the code , change the world*/@Data
@NoArgsConstructor
@AllArgsConstructor()
public class Artisan {private String name;private Integer age;private List<String> hobbies;
}

同步Event Bus

异步Event Bus

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/14385.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

AIGC - Stable Diffusion 的 AWPortrait 1.1 模型与 Prompts 设置

欢迎关注我的CSDN&#xff1a;https://spike.blog.csdn.net/ 本文地址&#xff1a;https://spike.blog.csdn.net/article/details/131565908 AWPortrait 1.1 网址&#xff1a;https://www.liblibai.com/modelinfo/721fa2d298b262d7c08f0337ebfe58f8 介绍&#xff1a;AWPortrai…

使用LiteSpeed缓存插件将WordPress优化到100%的得分

页面速度优化应该是每个网站所有者的首要任务&#xff0c;因为它直接影响WordPress SEO。此外&#xff0c;网站加载的时间越长&#xff0c;其跳出率就越高。这可能会阻止您产生转化并为您的网站带来流量。 使用正确的工具和配置&#xff0c;缓存您的网站可以显着提高其性能。因…

Spring系列3 -- 更简单的读取和存储对象

前言 上一篇章总结了,Spring的创建与使用,通过创建Maven项目配置Spring的环境依赖,创建Spring框架的项目,然后通过在Resource目录下创建Spring-config.xml配置文件,添加<bean></bean>标签将我们需要的bean对象注入到容器中,然后通过ApplicationContext获取Spring上…

web服务端接收多用户并发上传同一文件,保证文件副本只存在一份(附go语言实现)

背景 对于一个文件服务器来说&#xff0c;对于同一文件&#xff0c;应该只保存一份在服务器上。基于这个原则&#xff0c;引发出本篇内容。 本篇仅阐述文件服务器在同一时间接收同一文件的并发问题&#xff0c;这种对于小体量的服务来说并不常见&#xff0c;但是最好还是要留…

文本分析-使用Python做词频统计分析

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

【滑动窗口】209. 长度最小的子数组

209. 长度最小的子数组 解题思路 滑动窗口设置前后指针滑动窗口内的元素之和总是大于或者等于s滑动窗口的起始位置: 如果窗口的值大于等于s 窗口向前移动窗口结束位置:for循环的j class Solution {public int minSubArrayLen(int target, int[] nums) {int left 0;// 滑动窗口…

学习系统编程No.28【多线程概念实战】

引言&#xff1a; 北京时间&#xff1a;2023/6/29/15:33&#xff0c;刚刚更新完博客&#xff0c;目前没什么状态&#xff0c;不好趁热打铁&#xff0c;需要去睡一会会&#xff0c;昨天睡的有点迟&#xff0c;然后忘记把7点到8点30之间的4个闹钟关掉了&#xff0c;恶心了我自己…

使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令

使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令 文章目录 使用 Rust 实现连接远程 Linux 服务器、发送文件、执行命令一、Rust 概述使用场景优点缺点 二、功能实现1、代码2、运行日志3、服务器文件 一、Rust 概述 Rust 已经听了无数遍&#xff0c;我很清楚它很强&am…

LVS-DR集群

目录 一、构建LVS-DR集群的步骤 实验环境准备&#xff1a; 1、配置负载调度器&#xff08;192.168.40.200&#xff09; 1.1 配置虚拟 IP 地址&#xff08;VIP&#xff1a;192.168.40.190&#xff09; 1.2 调整 proc 响应参数 1.3 配置负载分配策略 2. 部署共享存储&#xf…

Elasticsearch 基本使用(五)查询条件匹配方式(query query_string)

查询条件匹配方式 概述querytermtermsrangematch_allmatchmatch 匹配精度问题 match_phrasematch_pharse_prefixmatch_bool_prefixmulti_match query_string简单查询一个字段在多个字段上应用同一个条件 &#xff08;类似multi_match&#xff09;在所有字段上应用同一个条件 &a…

html掉落本地图片效果

实现一个加载本地图片并掉落的html页面。 说明 将DuanWu.html与zongzi_1.png, zongzi_2.png, zongzi_3.png, yadan.png4张图片放在同一个目录下&#xff0c;然后双击打开DuanWu.html即可。 使用Chrome或Microsoft Edge浏览器打开 若使用IE浏览器打开&#xff0c;下方会出现In…

什么是敏捷测试?

目录 前言&#xff1a; 敏捷测试的定义 敏捷测试的特点 为什么要敏捷测试 缩短价值交付周期 强调质量属于大家 化繁为简节省成本 敏捷测试VS. 传统测试 传统测试如何迁移到敏捷测试 1. 组织文化的转变 2. 组织架构的调整 3. 人员培训与指导 4. 轻流程 敏捷测试成…