基于SpringBoot实现一个可扩展的事件总线

基于SpringBoot实现一个可扩展的事件总线

前言

在日常开发中,我们经常会用到事件总线,SpringBoot通过事件多播器的形式为我们提供了一个事件总线,但是在开发中我们经常会用到其他的实现,比如Guava、Disruptor的。我们将基于SpringBoot封装一套底层驱动可扩展的,统一api的事件驱动组件。

环境准备

jdk1.8
spring-boot-autoconfigure
Guava
Disruptor

pom文件如下

Copy<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.5</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.billetsdoux</groupId><artifactId>eventBus</artifactId><version>1.0.0</version><name>eventBus</name><description>eventBus</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>31.1-jre</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><scope>compile</scope><version>5.8.9</version></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.4.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency></dependencies></project>

组件介绍

整体架构#

目录结构如下:

[外链图片转存中…(img-m0HXGURP-1703487623977)]

我们的核心是一个EventListenerRegistry,它便是我们提供统一api的入口,它有两个方法,一个是init方法,在SpringBoot容器启动的时候会去注册我们所有的事件监听器,publish 方法则为事件发布的方法。这里我为它提供了3种实现,GuavaSpringDisruptor

[外链图片转存中…(img-AtJntQUE-1703487623978)]

EventModel#

这是我们定义的事件模型,topic为事件主题,我们通过不同的topic对应不同的事件处理器,entity为具体的事件对象模型

Copypackage com.billetsdoux.eventbus.model;import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@NoArgsConstructor
public class EventModel<T> implements Serializable {/***  事件发布主题*/private String topic;/***  事件对象模型*/private T entity;
}

EventListener#

EventListener 为事件消费接口,定义了2个方法,topic() 为监听的事件topic,onMessage()为事件的回调接口。

Copypackage com.billetsdoux.eventbus;/***  消费接口*/
public interface EventListener<T> {String topic();void onMessage(T message);
}

EventListenerRegistry#

这边是我们之前介绍的事件核心接口,它提供两个接口 initRegistryEventListener 负责注册我们所定义的所有事件监听器,publish 负责发送消息,我们底层的驱动需要继承这个接口。

Copypublic interface EventListenerRegistry<P> {void initRegistryEventListener(List<EventListener> eventConsumerList);void publish(P param);
}

SpringEventListenerRegistry#

这是我们通过Spring为我们提供的消息多播器来实现的一个事件驱动。这个类被@Component标记,那么它会在容器启动的时候,通过构造器为我们注入 eventListeners ,applicationContext 。eventListeners 为所有实现了EventListener接口,并被注入到容器里面的类。

initRegistryEventListener 这是一个空方法,因为他们本身已经在容器中了,所以不需要注册了

publish: 直接调用applicationContext.publishEvent就可以了。

Copypackage com.billetsdoux.eventbus.spring;
import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.List;@RequiredArgsConstructor
@Slf4j
@Component
public class SpringEventListenerRegistry implements EventListenerRegistry<EventModel> {final ApplicationContext applicationContext;final List<EventListener> eventListeners;@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {}@Overridepublic void publish(EventModel param) {applicationContext.publishEvent(param);}@PostConstructpublic void init(){log.info("开始初始化Spring事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Spring事件监听器的组件服务");}
}

GuavaEventListenerRegistry#

基于Guava来实现的事件总线,我们首先还是需要容器帮我们注入eventListeners。相较于Spring我们需要自己定义一个Guava的EventBus,然后把我们的Listener注册到这个EventBus中。

publish方法则是调用EventBus的post方法到。

Copy
package com.billetsdoux.eventbus.guava;import cn.hutool.core.thread.ThreadUtil;import com.billetsdoux.eventbus.EventListener;
import com.billetsdoux.eventbus.EventListenerRegistry;
import com.billetsdoux.eventbus.model.EventModel;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.objenesis.instantiator.util.ClassUtils;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.ExecutorService;@Component("guava")
@Slf4j
public class GuavaEventListenerRegistry implements EventListenerRegistry<EventModel> {EventBus eventBus;final List<EventListener> eventListeners;public GuavaEventListenerRegistry(List<EventListener> eventListeners) {this.eventListeners = eventListeners;}@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {final ExecutorService executor = ThreadUtil.newExecutor(10, 20, 300);eventBus = new AsyncEventBus(GuavaEventListenerRegistry.class.getName(),executor);eventConsumerList.forEach(param->{log.info("注册监听器:{}",param.getClass().getName());eventBus.register(ClassUtils.newInstance(param.getClass()));});}@Overridepublic void publish(EventModel param) {eventBus.post(param);}@PostConstructpublic void init(){log.info("开始初始化Guava事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Guava事件监听器的组件服务");}}

DisruptorEventListenerRegistry#

Disruptor的实现相对来说麻烦一点,它首先需要一个实现了EventFactory接口的类,它提供一个newInstance接口来创建事件对象模型。

具体的使用方式可以参考我这篇博文:Disruptor入门

EventModelFactory

我们首先还是需要注入我们的Listener,只是这里在init的时候是将我们的Listener交给我们的Disruptor去处理,我们先将Listener转成EventHandler,所以我们的监听器接口具体实现的时候除了实现我们定义的EventListener接口外还需要继承Disruptor的EventHandler接口。 调用disruptor.handleEventsWith(dataListener); 把我们的Listener交给Disruptor去管理。最后再启动Disruptor。

publish:调用Disruptor的RingBuffer来进行消息的发送。

Copy
/***  事件工厂*  Disruptor 通过EventFactory在RingBuffer中预创建Event的实例* @param <T>*/
public class EventModelFactory<T> implements EventFactory<EventModel<T>> {@Overridepublic EventModel<T> newInstance() {return new EventModel<>();}
}
Copy@Slf4j
@RequiredArgsConstructor
@Component("disruptor")
@Scope("prototype") // 线程安全问题
public class DisruptorEventListenerRegistry implements EventListenerRegistry<EventModel>,AutoCloseable {/***  disruptor事件处理器*/@Getter@Setterprivate Disruptor<EventModel> disruptor;@NonNullfinal List<EventListener> eventListeners;/***  RingBuffer的大小*/private final int DEFAULT_RING_SIZE = 1024 * 1024;/***  事件工厂*/private EventFactory<EventModel> eventFactory = new EventModelFactory();@Overridepublic void initRegistryEventListener(List<EventListener> eventConsumerList) {disruptor = new Disruptor<>(eventFactory, DEFAULT_RING_SIZE, createThreadFactory(), ProducerType.SINGLE, new BlockingWaitStrategy());EventHandler[] dataListener = eventConsumerList.stream().map(param -> {EventListener<EventModel> eventModelEventListener = param;return eventModelEventListener;}).collect(Collectors.toList()).toArray(new EventHandler[eventConsumerList.size()]);log.info("注册服务信息接口:{}",dataListener);disruptor.handleEventsWith(dataListener);disruptor.start();}@Overridepublic void publish(EventModel param) {publishEvent(param);}public void publishEvent(EventModel... eventModels){Objects.requireNonNull(disruptor, "当前disruptor核心控制器不可以为null");Objects.requireNonNull(eventModels, "当前eventModels事件控制器不可以为null");// 发布事件final RingBuffer<EventModel> ringBuffer = disruptor.getRingBuffer();try {final List<EventModel> dataList = Arrays.stream(eventModels).collect(Collectors.toList());for (EventModel element : dataList) {// 请求下一个序号long sequence = ringBuffer.next();// 获取该序号对应的事件对象EventModel event =  ringBuffer.get(sequence);event.setTopic(element.getTopic());event.setEntity(element.getEntity());ringBuffer.publish(sequence);}}catch (Exception e) {log.error("error",e);}}/***  关闭处理机制* @throws Exception*/@Overridepublic void close() throws Exception {if (Objects.nonNull(disruptor)) disruptor.shutdown();}@PostConstructpublic void init(){log.info("开始初始化Disruptor事件监听器的组件服务");initRegistryEventListener(eventListeners);log.info("完成初始化Disruptor事件监听器的组件服务");}private static ThreadFactory createThreadFactory(){AtomicInteger integer = new AtomicInteger();return r-> new Thread(r,"disruptor-"+integer.incrementAndGet());}}

至此我们已经实现了我们的目标三个EventListenerRegistry,我们接下来看看我们Listener如何实现。

BaseEventListener#

我们刚说过我们的Listener需要同时实现EventHandler跟EventListener,所以我们定义一个抽象类,注意这个EventListener是我们定义的,EventHandler是Disruptor定义的。

Copypublic abstract class BaseEventListener<T> implements EventListener<T>, EventHandler<T> {}

ExecutableEventListener#

我们定义一个抽象类ExecutableEventListener 我们来实现一下里面的方法。

对于Spring跟Guava来说只需要在方法上添加注解便可以在事件发生的时候回调过来,而对于Disruptor来说它的回调是继承EventHandler里面的onEvent方法。所以我们在onEvent里面手动调用onMessage方法,让所有的消息都转发给onMessage处理。

@org.springframework.context.event.EventListener Spring的回调注解

@Subscribe 的回调注解

onMessage:我们先调用topic()方法获取Listener方法的topic,这个方法我们这里先不实现,交给具体的实现类去实现这个方法。我们再定义一个handle的抽象方法,则是我们具体的消息处理逻辑的方法,也交给具体的实现类去实现。

Copy@Slf4j
public abstract class ExecutableEventListener extends BaseEventListener<EventModel<?>> {@org.springframework.context.event.EventListener@Subscribe@Overridepublic void onMessage(EventModel<?> message) {log.info("收到消息:{}",message);if (topic().equals(message.getTopic())){handle(message);}}@Overridepublic void onEvent(EventModel<?> event, long sequence, boolean endOfBatch) throws Exception {onMessage(event);}/***  具体消息处理方法* @param message*/protected abstract void handle(EventModel<?> message);}

至此我们的核心代码就开发完成了,现在定义两个注解,让我们能够在项目中启用它。

EnableEventBus:在启动类上添加这个注解以启用EventBus

[外链图片转存中…(img-OjGimDb2-1703487623978)]

EventBusConfiguration:配置一下Spring的包扫描路径
img

测试

我们把我们刚写的项目install到本地maven仓库,以便我们在项目中能够引用它。我们新建一个SpringBootWeb项目添加这个依赖测试下

在pom中添加

Copy  <dependency><groupId>com.billetsdoux</groupId><artifactId>eventBus</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

在启动类上添加这个注解启用

img

添加一个事件监听器,继承ExecutableEventListener,监听”blog“消息的主题。

img

添加一个Controller来测试一下我们的事件总线。它根据我们的type来选择不同的底层驱动:spring,guava,disruptor

[外链图片转存中…(img-ZndsBWCc-1703487623979)]

我们打包成docker镜像然后启动。
Dockerfile如下:

CopyFROM openjdk:8-jre-slim
MAINTAINER billtsdouxWORKDIR /appADD target/eventbus_blog*.jar app.jarEXPOSE 8080ENV JVM_OPTS="-Xmx256m -Xms256m" \TZ=Asia/ShanghaiRUN ln -sf /usr/share/zoneinfo/$TZ /etc/localtime \&& echo $TZ > /etc/timezoneENTRYPOINT ["sh","-c","java -jar $JVM_OPTS app.jar"]

我们这里配置一下打包后镜像的名称,已经启动的容器名称跟监听的端口。

[外链图片转存中…(img-xKzuGe5r-1703487623980)]

构建成功

img

并且也启动一个容器:

[外链图片转存中…(img-QETw4G24-1703487623980)]

查看日志可以看到我们内置的三个监听器注册器已经成功启动了。

[外链图片转存中…(img-ScQl7W0a-1703487623980)]

我们测试一下接口:可以看到根据我们选择的不同类型我们可以选择不同的实现。

[外链图片转存中…(img-aFD49ezm-1703487623981)]

[外链图片转存中…(img-CnPONtYE-1703487623981)]

[外链图片转存中…(img-mDQW65LN-1703487623981)]

后言

如果提供这个三个不够用,我们还可以通过实现这个接口EventListenerRegistry来扩展我们的事件总线组件,再注入到容器中,在调用的时候选择具体的实现就好了。

标签: java , Spring

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/298910.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Webpack基础使用

目录 一.什么是Webpack 二.为什么要使用Webpack 三.Webpack的使用 1.下载yarn包管理器 2.Webpack的安装 3.Webpack的简单使用 4.效果 四.Webpack打包流程 一.什么是Webpack Webpack是一个静态模块打包工具 二.为什么要使用Webpack 在开发中&#xff0c;我们常常会遇到…

多媒体领域顶会ACM MM 2023 闭幕,获奖论文一览!

多媒体领域顶会 国际多媒体会议&#xff08;The 31th ACM International Conference on Multimedia&#xff0c;ACM MM&#xff09;于2023年10月28日至11月3日在加拿大渥太华举行&#xff0c;该会议是计算机图形学与多媒体领域顶级会议&#xff0c;被中国计算机学会列为A类会议…

[SWPUCTF 2021 新生赛]hardrce

[SWPUCTF 2021 新生赛]hardrce wp 参考博客&#xff1a;https://www.cnblogs.com/bkofyZ/p/17644820.html 代码审计 题目的代码如下&#xff1a; <?php header("Content-Type:text/html;charsetutf-8"); error_reporting(0); highlight_file(__FILE__); if(is…

Python 爬虫之下载视频(五)

爬取第三方网站视频 文章目录 爬取第三方网站视频前言一、基本情况二、基本思路三、代码编写四、注意事项&#xff08;ffmpeg&#xff09;总结 前言 国内主流的视频平台有点难。。。就暂且记录一些三方视频平台的爬取吧。比如下面这个&#xff1a; 一、基本情况 这次爬取的方…

PSoc62™开发板之按键控制LED

实验目的 使用板子上的用户自定义按键控制LED亮灭&#xff0c;当按键按下时LED亮起来&#xff0c;不按下则不亮 电路图 按键电路 板子有两组按键&#xff0c;分别是系统复位按键和用户自定义按键&#xff0c;这里我们选择控制用户自定义按键&#xff0c;可以看到MCU_USER_B…

学习 Web 开发

学习 Web 开发 | MDN (mozilla.org)https://developer.mozilla.org/zh-CN/docs/Learn 从零开始学习 Web 开发极具挑战性&#xff0c;该教程将为你提供详细的资料&#xff0c;手把手帮助你轻松愉快地学习。无论你是正在学习 Web 开发的学生&#xff08;自学或参与课程&…

基于 Webpack 插件体系的 Mock 服务

背景 在软件研发流程中&#xff0c;对于前后端分离的架构体系而言&#xff0c;为了能够更快速、高效的实现功能的开发&#xff0c;研发团队通常来说会在产品原型阶段对前后端联调的数据接口进行结构设计及约定&#xff0c;进而可以分别同步进行对应功能的实现&#xff0c;提升研…

ctf web赛道基础 万字笔记

一、SQL注入&#xff08;mysql&#xff09;&#xff1a; 基本语法 判断列数 order by 3 查询所有数据库&#xff1a; select group_concat(schema_name) from information_schema.schemata 查询当前数据库的所有表&#xff1a; select group_concat(table_name) from informa…

数字人直播系统源码开发:实现电商必备的一键生成真人直播卖货

随着互联网技术的不断演进和电子商务的蓬勃发展&#xff0c;直播电商成为了一种新兴的销售模式。然而&#xff0c;传统的直播方式存在着一些问题&#xff0c;比如主播的时间和精力有限&#xff0c;无法满足大量商品的销售需求。为了解决这个问题&#xff0c;数字人直播系统应运…

【计算机四级(网络工程师)笔记】操作系统运行机制

目录 一、中央处理器&#xff08;CPU&#xff09; 1.1CPU的状态 1.2指令分类 二、寄存器 2.1寄存器分类 2.2程序状态字&#xff08;PSW&#xff09; 三、系统调用 3.1系统调用与一般过程调用的区别 3.2系统调用的分类 四、中断与异常 4.1中断 4.2异常 &#x1f308;嗨&#xff…

华为OD机试 - 学生方阵 - 矩阵(Java 2023 B卷 200分)

目录 专栏导读一、题目描述二、输入描述三、输出描述1、输入2、输出 四、解题思路1、题目解析2、解体思路 五、Java算法源码再重新读一遍题目&#xff0c;看看能否优化一下~ 六、效果展示1、输入2、输出3、说明 华为OD机试 2023B卷题库疯狂收录中&#xff0c;刷题点这里 专栏导…

GPU性能实时监测的实用工具

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…