Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

引言

在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。

Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。

基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。

图一

图一

主要概念:

1. application model(应用模型)

图二.Spring Cloud Stream 应用程序

图二.Spring Cloud Stream 应用程序

由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系,发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象)

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。

Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model(编程模型)

核心概念

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。

  • Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。

  • Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

图四

图四

环境搭建

本文环境:

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。<dependencies><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
</dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息:

spring:cloud:stream:stream:rocketmq:binder:name-server: 127.0.0.1:9876;127.0.0.1:9877function:# 组装和绑定definition: myTopicCbinders:default:type: rocketmqbindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:cloud:stream:stream:kafka:binder:brokers: 127.0.0.1:9092function:# 组装和绑定definition: myTopicCbinders:default:type: kafkabindings:## 生产者 新版本固定格式  函数名-{out/in}-{index}demoChannel-out-0:destination: boot-mq-topic## 消费者 新版本固定格式  函数名字-{out/in}-{index}demoChannel-in-0:destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者:

@RestController
@Slf4j
public class ProducerStream {@Autowiredprivate StreamBridge streamBridge;@GetMapping("/test-stream")public String testStream() {streamBridge.send("demoChannel-out-0",MessageBuilder.withPayload("消息体").build());return "success";}
}

消息消费者

创建一个消息消费者来接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {@Beanpublic Consumer<String> demoChannel() {return message -> {log.info("demoChannel接到消息:{}", message);};}
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。

具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging 的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。

Spring MessagingSpring 框架中的一个底层模块,用于提供统一的消息编程模型。

消息 Message 接口定义:

public interface Message<T> {//消息体T getPayload();//消息头MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义:

@FunctionalInterface
public interface MessageChannel {long INDEFINITE_TIMEOUT = -1;//发送消息,无限期阻塞default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);}//发送消息,阻塞直到到达指定超时时间boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

  • 支持轮询的 PollableChannel

  • 实现发布-订阅模式的 SubscribableChannel

这两个通道都继承自具有消息发送功能的 MessageChannel

public interface SubscribableChannel extends MessageChannel {//通过注册回调函数MessageHandler来实现事件响应//注册消息处理器boolean subscribe(MessageHandler handler);//取消注册消息处理器boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {//通过轮询操作主动获取消息//从通道中接收消息@NullableMessage<?> receive();//指定超时时间,从通道中接收消息@NullableMessage<?> receive(long timeout);
}

MessageHandler接口定义:

@FunctionalInterface
public interface MessageHandler {//处理消息方法void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类( AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们的主题,Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

  • 统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。

  • Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的 Binder 实现。

  • 自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。

  • 高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,Spring Cloud Stream 提供了抽象层,由不同的消息队列去实现。

参考资料

  • 官方文档:Spring Cloud Stream Reference Guide

  • 《Spring核心技术和案例实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/231482.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

对数据页的理解

1.InnoDB 是如何存储数据的&#xff1f; 数据表中的记录是按照行来存储的&#xff0c;但是数据库的读取并不以「行」为单位&#xff0c;否则一次读取&#xff08;也就是一次 I/O 操作&#xff09;只能处理一行数据&#xff0c;效率会非常低。 因此&#xff0c;InnoDB 的数据是按…

人工智能-优化算法之动量法

对于嘈杂的梯度&#xff0c;我们在选择学习率需要格外谨慎。 如果衰减速度太快&#xff0c;收敛就会停滞。 相反&#xff0c;如果太宽松&#xff0c;我们可能无法收敛到最优解。 泄漏平均值 小批量随机梯度下降作为加速计算的手段。 它也有很好的副作用&#xff0c;即平均梯度…

Android端极致画质体验之HDR播放

高动态范围HDR视频通过扩大亮度分量的动态范围(从100cd/m2到1000cd/m2)&#xff0c;以及采用更宽的色彩空间BT2020&#xff0c;提供极致画质体验。从Android10开始&#xff0c;支持HDR视频播放。 一、HDR技术 HDR技术标准包括&#xff1a;Dolby-Vision、HDR10、HLG、PQ。支持…

使用STM32 HAL库驱动烟雾传感器的设计和优化

STM32 HAL库是STMicroelectronics提供的针对STM32系列微控制器的一套硬件抽象层库&#xff0c;可以简化开发过程并提供对各种外设的支持。本文将介绍如何使用STM32 HAL库来驱动烟雾传感器&#xff0c;并对传感器数据采集和处理进行优化。将包括HAL库的初始化、模拟信号采集、数…

【开源视频联动物联网平台】流媒体传输协议HLS,FLV的功能和特点

HLS&#xff08;HTTP Live Streaming&#xff09;和FLV&#xff08;Flash Video&#xff09;都是用于视频流传输的协议或容器格式&#xff0c;但它们在某些方面有着显著的区别和特点。 HLS是一种由苹果公司开发的用于流媒体传输的协议&#xff0c;而FLV则是Adobe公司开发的用于…

SparkSQL远程调试(IDEA)

启动Intellij IDEA&#xff0c;打开spark源码项目&#xff0c;配置远程调试 Run->Edit Configuration 启动远程spark-sql spark-sql --verbose --driver-java-options "-Xdebug -Xrunjdwp:transportdt_socket,servery,suspendy,address5005"运行远程调试&#xf…

阿里云国际短信业务网络超时排障指南

选取一台或多台线上的应用服务器或选取相同网络环境下的机器&#xff0c;执行以下操作。 获取公网出口IP。 curl ifconfig.me 测试连通性。 &#xff08;推荐&#xff09;执行MTR命令&#xff08;可能需要sudo权限&#xff09;&#xff0c;检测连通性&#xff0c;执行30秒。 m…

银河麒麟高级服务器操作系统V10安装达梦数据库管理系统DM8——单实例

一、介绍 之前介绍过供个人学习在VMware虚拟机上安装银河麒麟高级服务器操作系统V10&#xff0c;有兴趣的可以去看看&#xff08;银河麒麟V10安装&#xff09;&#xff0c;本次主要学习在银河麒麟V10上安装达梦数据库-DM8。DM8是达梦公司在总结DM系列产品研发与应用经验的基础…

数字系列——数字经济(2)​

上次呢&#xff0c;已经为大家捋了什么是数字经济&#xff1f;、数字经济的特点有哪些&#xff1f;和数字经济的构成&#xff0c;对于数字经济有了基础性的了解&#xff0c;今天继续为大家捋一捋。 数字经济的发展 1.互联网的普及 互联网作为数字经济的坚实基础&#xff0c;其…

第五届全国高校计算机能力挑战赛-程序设计挑战赛(C语言模拟题)

1、已有定义“int a[10]{1,2},i0;”&#xff0c;下面语句中与“ a[i]a[i1],i;”等价的是()。 A. a[i]a[i1]; B. a[i]a[i]; C. a[i]a[i1]; D. i,a[i-1]a[i]; 2、两次运行下面的程序&#xff0c;如果从键盘上分别输入6和4&#xff0c;则输出结果是(&#xff09;。 A. 7和5 …

Maven——仓库

Maven坐标和依赖是任何一个构件在Maven世界中的逻辑表示方式&#xff1b;而构件的物理表示方式是文件&#xff0c;Maven通过仓库来统一管理这些文件。 1、何为Maven仓库 在Maven世界中&#xff0c;任何一个依赖、插件或者项目构建的输出&#xff0c;都可以称为构件。例如&…

Windows11编译Hadoop3.3.6源码

由于https://github.com/kontext-tech/winutils还未发布3.3.6版本&#xff0c;因此尝试源码编译 目录 环境和安装包准备&#xff0c;见2zlib编译方法一&#xff1a;方法二&#xff1a; 配置文件更改1. maven阿里云镜像2. Node版本3. 越过Javadoc检查 编译HadoopError,其他报错…