SpringBoot - Google EventBus、AsyncEventBus

介绍

EventBus 顾名思义,事件总线,是一个轻量级的发布/订阅模式的应用模式,最初设计及应用源与 google guava 库。

相比于各种 MQ 中间件更加简洁、轻量,它可以在单体非分布式的小型应用模块内部使用(即同一个JVM范围)。

我们也可以把它和 MQ 中间件结合起来使用,使用 EventBus 作为当前应用程序接收中间件 MQ 消息的统一入口,然后应用内部基于 EventBus 进行分发订阅,以达到高内聚低耦合的目的(当应用内部需要消费多种不同 MQ 中间件消息时,不需要在当前应用的好多不同代码位置都编写 MQ 消费代码)。

EventBus 整体设计和流程比较简单,由注册、发布和订阅三个要点组成,如下:
在这里插入图片描述

注意事项

本文对 google guava 库中的 EventBus 进行实例说明,注意事项要先进行特别说明。

  • EventBus 默认为同步调用,同一个 EventBus 中注册的多个订阅处理,再事件下发后是被总线串行逐个调用的,如果其中一个方法占用事件较长,则同一个 EventBus 中的其他事件处于等待状态,且发送消息事件的代码调用处也是同步调用等待的状态。
  • 同一个 EventBus 对象,不仅仅在同一个 post 调用中串行执行,在多次并发 post 调用时,多个 post 调用之间也是串行等待执行的关系,这个要特别注意,应用不当会导致严重的消息消费处理性能瓶颈问题!

所以推荐使用异步的方式处理,异步处理主要包括 “EventBus 使用线程池统一异步” 和 “订阅消费处理代码自己使用线程异步” 两种方式。这里我更推荐使用前者,因为后者对开发者有一定的要求,加入开发者某个耗时的业务订阅实现没有自行使用线程异步处理,则会影响其他处的订阅处理。

代码示例

1、添加 pom 依赖

        <!-- google EvengBus 在 guava 包中 --><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>32.1.2-jre</version></dependency><!-- lombok 非必须,其作用你懂得 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.28</version><scope>provided</scope></dependency>

2、创建一个Java接口用于自动注册

package com.example.demospringbean.eventbus;/*** 用于自动注册事件订阅类的接口* * @author shanhy* @date 2023-08-30 12:06*/
public interface EventBusListener {
}

3、编写总配置类

package com.example.demospringbean.eventbus;import com.google.common.eventbus.EventBus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.List;/*** EventBus 配置类** @author shanhy* @date 2023-08-30 11:11*/
@Configuration
public class EventBusConfiguration {/*** 实例化 EventBus 对象,并自动注册所有订阅类对象** @param eventListenerList 所有实现了 EventBusListener 接口的实现类* @return*/@Beanpublic EventBus eventBus(List<EventBusListener> eventListenerList){// 异步处理,按照自己需要,实现自己的 Executor 逻辑,例如为了防止线程长期占用需要增加超时机制等
//      EventBus eventBus = new AsyncEventBus(new Executor() {
//            public void execute(Runnable command) {
//                new Thread(command).start();
//            }
//        });EventBus eventBus = new EventBus();if(eventListenerList != null && !eventListenerList.isEmpty()) {eventListenerList.iterator().forEachRemaining(eventListener -> eventBus.register(eventListener));}return eventBus;}}

4、编写订阅测试类

package com.example.demospringbean.eventbus;import com.google.common.eventbus.Subscribe;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @author shanhy* @date 2023-08-30 11:19*/
@Component
public class EventSub1 implements EventBusListener {@Subscribepublic void handlerEvent(String test) {System.out.println("11111>>>>>" + test);}@Subscribepublic void handlerEvent2(String test) throws InterruptedException {TimeUnit.SECONDS.sleep(5);System.out.println("22222>>>>>" + test);}}
package com.example.demospringbean.eventbus;import lombok.Builder;
import lombok.Data;/*** @author shanhy* @date 2023-08-30 13:19*/
@Data
@Builder
public class User {private String name;private int age;}
package com.example.demospringbean.eventbus;import com.google.common.eventbus.Subscribe;
import org.springframework.stereotype.Component;/*** @author shanhy* @date 2023-08-30 11:19*/
@Component
public class EventSub2 implements EventBusListener {@Subscribepublic void handlerEvent(String test){System.out.println("33333>>>>>" + test);}@Subscribepublic void handlerEvent2(User user){System.out.println("44444>>>>>" + user.getName());}}

5、编写消息事件发送测试

package com.example.demospringbean;import com.example.demospringbean.eventbus.User;
import com.google.common.eventbus.EventBus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** 接口示例** @author shanhy* @date 2023-03-20 15:49*/
@RestController
@RequestMapping("/test")
public class TestController {@Autowiredprivate EventBus eventBus;@GetMapping("/testEvent1")public String testEvent1(){eventBus.post("Hello");return "OK";}@GetMapping("/testEvent2")public String testEvent2(){eventBus.post(User.builder().name("Tome").age(22).build());return "OK";}
}

代码说明:

1、以上代码使用的 EventBus、未使用 AsyncEventBus,并加入了线程 sleep,是为了运行代码可以观察其串行处理效果(浏览器开2个Tab同时调用 /testEvent1 观察输出),让你能更明显的感受到这种处理会给程序带来多大的性能问题(推荐实际业务生产中使用 AsyncEventBus)。

2、@Subscribe 注解修饰的事件处理方法,其参数和发送事件时的消息体会自动按类型关联对应。只有相同类型的消息体才会被消费处理。例如示例中 /testEvent1 接口发送的 “Hello” 字符串,不会触发 handlerEvent2(User user) 方法的执行,同理执行示例中 /testEvent2 接口发送 User 对象时,只会触发 handlerEvent2(User user) 方法。


(END)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/90785.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Spring Boot】数据库持久层框架MyBatis — MyBatis简介

MyBatis简介 本节首先会介绍什么是ORM、什么是MyBatis、MyBatis的特点以及核心概念&#xff0c;最后介绍MyBatis是如何启动、如何加载配置文件的&#xff1f; 1.什么是ORM ORM&#xff08;Object Relational Mapping&#xff0c;对象关系映射&#xff09;是为了解决面向对象…

【STM32】学习笔记(TIM定时器)-江科大

TIM&#xff08;Timer&#xff09;定时器 定时器可以对输入的时钟进行计数&#xff0c;并在计数值达到设定值时触发中断 16位计数器、预分频器、自动重装寄存器的时基单元&#xff0c;在72MHz计数时钟下可以实现最大59.65s的定时 不仅具备基本的定时中断功能&#xff0c;而且…

什么是住宅ip,静态和动态怎么选?

上文我们介绍了数据中心代理&#xff0c;这次我们来介绍下住宅代理ip&#xff0c;住宅代理ip分类两种类型&#xff1a;静态住宅代理和动态住宅代理&#xff0c;他们有什么区别又能用在什么场景呢&#xff1f;我们先从他们是如何运作开始。 一、什么是住宅代理ip isp住宅代理i…

SQL语言-01

SQL Structured Query Language 的简单介绍 SQL 中的书写规则 SQL 中的数据类型

KylinOS配置完静态IP地址后,保存按钮是灰色

问题: 配置完静态IP地址后,保存按钮置灰,并且提示“无效设置IPv4设置:ipv4.gateway:网关与”never-default”不兼容”。 原因: 这是由于禁止添加默认路由导致的。 解决方案: 1、使用nmcli命令: nmcli con modify "有线连接 1" ipv4.never-default no 执…

数据结构--队列与循环队列

队列 队列是什么&#xff0c;先联想一下队&#xff0c;排队先来的人排前面先出&#xff0c;后来的人排后面后出&#xff1b;队列的性质也一样&#xff0c;先进队列的数据先出&#xff0c;后进队列的后出&#xff1b;就像图一的样子&#xff1a; 图1 如图1&#xff0c;1号元素是…

K8S容器OOM killed排查

背景 数据服务平台南海容器k8s设置的内存上限2GB&#xff0c;多次容器被OOM killed。 启动命令 java -XX:MaxRAMPercentage70.0 -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath/apps/logs/ ***.jar排查过程 1 当收到实例内存超过95%告警时&#xff0c;把jvm进程堆dump下…

行政固定资产应该怎么管理

行政需要管理的固定资产主要包括办公设备、交通工具、通讯设备、家具等。具体来说&#xff0c;行政需要管理的固定资产包括但不限于&#xff1a;电脑、打印机、传真机、复印机、投影仪、电话、传真机、传真纸、电话线、路由器、交换机、服务器、UPS电源、办公桌椅、沙发等。 行…

链路聚合原理

文章目录 一、定义二、功能三、负载分担四、分类五、常用命令 首先可以看下思维导图&#xff0c;以便更好的理解接下来的内容。 一、定义 在网络中&#xff0c;端口聚合是一种将连接到同一台交换机的多个物理端口捆绑在一起&#xff0c;形成一个逻辑端口的技术。通过端口聚合&…

Django报错:SystemCheckError: System check identified some issues解决办法

今天练习django自定义标签时&#xff0c;一开始在APPbook中写了自定义标签book_tags.py 测试成功&#xff0c;之后新建了一个APPblogs&#xff0c;测试在blogs中创建模板使用自定义标签&#xff0c;于是直接把book/templatetags包直接赋值到blogs目录里。在页面里加载自定义标…

33、Flink之hive介绍与简单示例

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…

Python文本终端GUI框架详解

今天笔者带大家&#xff0c;梳理几个常见的基于文本终端的 UI 框架&#xff0c;一睹为快&#xff01; Curses 首先出场的是 Curses。 Curses 是一个能提供基于文本终端窗口功能的动态库&#xff0c;它可以: 使用整个屏幕 创建和管理一个窗口 使用 8 种不同的彩色 为程序提供…