RocketMQ 是一款功能强大的分布式消息系统,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
RocketMQ 源码地址:https://github.com/apache/rocketmq(opens new window)
RocketMQ 官方网站:https://rocketmq.apache.org(opens new window)
文章描述 RocketMQ 相关概念和知识,如无特别声明,均是 Apache RocketMQ 4.x 版本。
🔥 SpringBoot Ladder (opens new window):从零到一学习 SpringBoot 各种组件框架实战的项目,让 Demo 变得简单。咱们文章中的 RocketMQ 示例也在这个项目。
#什么场景下用 RocketMQ?
#1. 异步解耦
最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:
串行方式
串行方式下的注册流程如下图所示。
数据流动如下所述:
- 您在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
- 邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。
以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为50ms,则用户需要在注册页面等待总共150ms才能登录。
并行方式
并行方式下的注册流程如下图所示。
数据流动如下所述:
- 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再同时发送请求至邮件和短信通知系统。邮件和短信通知系统收到请求后分别向用户发送邮件和短信通知。
以上两个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。
假设每个任务耗时分别为50ms,其中,邮件和短信通知并行完成,则用户需要在注册页面等待总共100ms才能登录。
异步解耦
对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。
对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。
数据流动如下所述:
- 用户在注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
- 注册信息写入注册系统成功后,再发送消息至 RocketMQ。 RocketMQ 会马上返回响应给注册系统,注册完成。用户可立即登录。
- 下游的邮件和短信通知系统订阅 RocketMQ 的此类注册请求消息,即可向用户发送邮件和短信通知,完成所有的注册流程。
用户只需在注册页面等待注册数据写入注册系统和 RocketMQ 的时间,即等待55ms即可登录。
#2. 削峰填谷
流量削峰也是 RocketMQ 的常用场景,一般在秒杀或团队抢购活动中使用广泛。
在秒杀或团队抢购活动中,由于用户请求量较大,导致流量暴增,秒杀的应用在处理如此大量的访问流量后,下游的通知系统无法承载海量的调用量,甚至会导致系统崩溃等问题而发生漏通知的情况。为解决这些问题,可在应用和下游通知系统之间加入 RocketMQ。
秒杀处理流程如下所述:
- 用户发起海量秒杀请求到秒杀业务处理系统。
- 秒杀处理系统按照秒杀处理逻辑将满足秒杀条件的请求发送 RocketMQ。
- 下游的通知系统订阅 RocketMQ 的秒杀相关消息,再将秒杀成功的消息发送到相应用户。
- 用户收到秒杀成功的通知。
#3. 顺序消息
顺序消息是 RocketMQ 提供的一种对消息发送和消费顺序有严格要求的消息。
对于一个指定的 Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。
顺序消息分为分区顺序消息和全局顺序消息。
- 分区顺序消息:对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。
- 适用场景:适用于性能要求高,以 Sharding Key 作为分区字段,在同一个区块中严格地按照先进先出(FIFO)原则进行消息发布和消费的场景。
- 示例
- 用户注册需要发送验证码,以用户 ID 作为 Sharding Key,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
- 电商的订单创建,以订单 ID 作为 Sharding Key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。
- 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序来发布和消费。
- 适用场景:适用于性能要求不高,所有的消息严格按照 FIFO 原则来发布和消费的场景。
- 示例:在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以按照 FIFO 的方式发布和消费全局顺序消息。
全局顺序消息实际上是一种特殊的分区顺序消息,即 Topic 中只有一个分区,因此全局顺序和分区顺序的实现原理相同。因为分区顺序消息有多个分区,所以分区顺序消息比全局顺序消息的并发度和性能更高。
#4. 分布式模缓存同步
双十一大促时,各个分会场会有琳琅满目的商品,每件商品的价格都会实时变化。使用缓存技术也无法满足对商品价格的访问需求,缓存服务器网卡满载。访问较多次商品价格查询影响会场页面的打开速度。
此时需要提供一种广播机制,一条消息本来只可以被集群的一台机器消费,如果使用 RocketMQ 的广播消费模式,那么这条消息会被所有节点消费一次,相当于把价格信息同步到需要的每台机器上,取代缓存的作用。
#5. 分布式定时/延时调度
RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。
使用 RocketMQ 定时消息有如下优势:
- 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。
- 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。
#RocketMQ 基础概念
#1. 主题 Topic
主题是 Apache RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。 主题的作用主要如下:
- 定义数据的分类隔离: 在 Apache RocketMQ 的方案设计中,建议将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。
- 定义数据的身份和权限: Apache RocketMQ 的消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
#2. 队列 Queue
队列是 Apache RocketMQ 中消息存储和传输的实际容器,也是 Apache RocketMQ 消息的最小存储单元。 Apache RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。
#3. 消息 Message
消息是 Apache RocketMQ 中的最小数据传输单元。生产者将业务数据的负载和拓展属性包装成消息发送到 Apache RocketMQ 服务端,服务端按照相关语义将消息投递到消费端进行消费。
#4. 生产者 Producer
发布消息的角色。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
#5. 消费者 Consumer
消息消费的角色。
- 支持以推(push),拉(pull)两种模式对消息进行消费。
- 同时也支持集群方式和广播方式的消费。
- 提供实时消息订阅机制,可以满足大多数用户的需求。
#6. 名字服务器 NameServer
NameServer 是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
主要包括两个功能:
- Broker管理,NameServer 接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查 Broker 是否还存活;
- 路由信息管理,每个 NameServer 将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Consumer 通过 NameServer 就可以知道整个 Broker 集群的路由信息,从而进行消息的投递和消费。
NameServer 通常会有多个实例部署,各实例间相互不进行信息通讯。Broker 是向每一台 NameServer 注册自己的路由信息,所以每一个 NameServer 实例上面都保存一份完整的路由信息。当某个 NameServer 因某种原因下线了,客户端仍然可以向其它 NameServer 获取路由信息。
#7. 代理服务器 Broker
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
NameServer 几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker 部署相对复杂。
在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master。Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。
部署模型小结:
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息。
#RocketMQ 工作原理
#1. 启动 NameServer
启动 NameServer。NameServer 启动后监听端口,等待 Broker、Producer、Consumer 连接,相当于一个路由控制中心。
#2. 启动 Broker
启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
#3. 创建 Topic
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建 Topic。
#4. 生产者发送消息
生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker发消息。
#5. 消费者接受消息
消费者接受消息。跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,然后开始消费消息。
#动手发一条消息
#1. 启动 RocketMQ
星球用户直接使用公用 RocketMQ 中间件启动即可,跳过该小节。
安装 NameServer。
docker run -d -p 9876:9876 --name rmqnamesrv foxiswho/rocketmq:server-4.5.1
安装 Brocker。
1)新建配置目录。
如果是 Windows 需要替换为 Windows 的电脑路径,和 Linux 还是有点差异。
mkdir -p ${HOME}/docker/software/rocketmq/conf
2)新建配置文件 broker.conf。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 此处为本地ip, 如果部署服务器, 需要填写服务器外网ip
brokerIP1 = xx.xx.xx.xx
3)创建容器。
docker run -d \
-p 10911:10911 \
-p 10909:10909 \
--name rmqbroker \
--link rmqnamesrv:namesrv \
-v ${HOME}/docker/software/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf \
-e "NAMESRV_ADDR=namesrv:9876" \
-e "JAVA_OPTS=-Duser.home=/opt" \
-e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m" \
foxiswho/rocketmq:broker-4.5.1
安装 RocketMQ 控制台。
docker pull pangliang/rocketmq-console-ng
docker run -d \
--link rmqnamesrv:namesrv \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=namesrv:9876 -Drocketmq.config.isVIPChannel=false" \
--name rmqconsole \
-p 8088:8080 \
-t pangliang/rocketmq-console-ng
运行成功,稍等几秒启动时间,浏览器输入 localhost:8088
查看控制台。
#2. 发送普通消息
下述完整 Demo 详情查看 springboot-ladder/mq-rocketmq-4x (opens new window)项目模块。
#2.1 引入 RocketMQ 依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version>
</dependency>
#2.2 启动自动装配
因为咱们 Demo 中使用的是 SpringBoot3,RocketMQ 最新版本 2.2.3 没有适配 SpringBoot3,所以需要手动搞定自动装配。
如果 SpringBoot2 版本,就不需要执行这一步。
resources 目录下创建 META-INF/spring 目录,并创建org.springframework.boot.autoconfigure.AutoConfiguration.imports
文件。
# RocketMQ 2.2.3 version does not adapt to SpringBoot3
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
#2.3 消息生产者
配置文件中引入 RocketMQ 相关配置定义,比如连接 NameServer 地址等。
server:port: 6060rocketmq:name-server: 127.0.0.1:9876 # NameServer 地址producer:group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义
定义消息生产者,通过 RocketMQTemplate
向 RocketMQ 发送普通常规消息。
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;/*** 普通消息发送者** @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料*/
@Slf4j
@Component
@RequiredArgsConstructor
public class GeneralMessageDemoProduce {private final RocketMQTemplate rocketMQTemplate;/*** 发送普通消息** @param topic 消息发送主题,用于标识同一类业务逻辑的消息* @param tag 消息的过滤标签,消费者可通过Tag对消息进行过滤,仅接收指定标签的消息。* @param keys 消息索引键,可根据关键字精确查找某条消息* @param messageSendEvent 普通消息发送事件,自定义对象,最终都会序列化为字符串* @return 消息发送 RocketMQ 返回结果*/public SendResult sendMessage(String topic, String tag, String keys, GeneralMessageEvent messageSendEvent) {SendResult sendResult;try {StringBuilder destinationBuilder = StrUtil.builder().append(topic);if (StrUtil.isNotBlank(tag)) {destinationBuilder.append(":").append(tag);}Message<?> message = MessageBuilder.withPayload(messageSendEvent).setHeader(MessageConst.PROPERTY_KEYS, keys).setHeader(MessageConst.PROPERTY_TAGS, tag).build();sendResult = rocketMQTemplate.syncSend(destinationBuilder.toString(),message,2000L);log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);} catch (Throwable ex) {log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);throw ex;}return sendResult;}
}
#2.4 消息消费者
定义消息消费者,从 RocketMQ Broker 拉取对应 Topic Tag 的消息列表。
import com.alibaba.fastjson.JSON;
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** 普通消息消费者** @公众号:马丁玩编程,回复:加群,添加马哥微信(备注:ladder)获取更多项目资料*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(topic = "rocketmq-demo_common-message_topic",selectorExpression = "general",consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<GeneralMessageEvent> {@Overridepublic void onMessage(GeneralMessageEvent message) {log.info("接到到RocketMQ消息,消息体:{}", JSON.toJSONString(message));}
}
#2.5 发送一条消息
定义消息发送程序,这里为了避免类过多,直接写在 SpringBoot 的启动程序里。发送普通消息的方法返回值就是发送 RocketMQ Broker 返回的状态码,成功的话就是 SEND_OK
。
import com.nageoffer.springbootladder.rocketmq4x.event.GeneralMessageEvent;
import com.nageoffer.springbootladder.rocketmq4x.produce.GeneralMessageDemoProduce;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@RestController
@RequiredArgsConstructor
@SpringBootApplication
@Tag(name = "RocketMQ发送示例", description = "RocketMQ发送示例启动器")
public class RocketMQDemoApplication {private final GeneralMessageDemoProduce generalMessageDemoProduce;@PostMapping("/test/send/general-message")@Operation(summary = "发送RocketMQ普通消息")public String sendGeneralMessage() {String keys = UUID.randomUUID().toString();GeneralMessageEvent generalMessageEvent = GeneralMessageEvent.builder().body("消息具体内容,可以是自定义对象,最终都会序列化为字符串。如果是取消订单,这里应该是订单ID或者相关联的信息").keys(keys).build();SendResult sendResult = generalMessageDemoProduce.sendMessage("rocketmq-demo_common-message_topic","general",keys,generalMessageEvent);return sendResult.getSendStatus().name();}public static void main(String[] args) {SpringApplication.run(RocketMQDemoApplication.class, args);}
}
项目中引入了 Swagger3,通过界面 UI 发送一条消息测试效果。访问 http://127.0.0.1:6060/swagger-ui/index.html
,调用定义的发送 RocketMQ 普通消息方法。
点击 Execute 执行方法调用。
通过方法调用得知,返回数据为成功。
也能看到 RocketMQ 对应的生产者和消费者对应日志。
2023-09-24T17:38:57.457+08:00 INFO 48437 --- [nio-6060-exec-6] c.n.s.r.p.GeneralMessageDemoProduce : [普通消息] 消息发送结果:SEND_OK,消息ID:7F000001BD35251A69D77A3BC5280002,消息Keys:7a60c853-08dc-46cd-a647-398d45b54966
2023-09-24T17:38:57.459+08:00 INFO 48437 --- [al-message_cg_3] c.n.s.r.c.GeneralMessageDemoConsume : 接到到RocketMQ消息,消息体:{"body":"消息具体内容,可以是自定义对象,最终都会序列化为字符串。如果是取消订单,这里应该是订单ID或者相关联的信息","keys":"7a60c853-08dc-46cd-a647-398d45b54966"}
#3. 扩展框架 SpringCloud Stream
Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration
与 Broker 进行连接。
Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。
Spring Cloud Stream 内部有两个概念:Binder 和 Binding。
- Binder:跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。
比如 Kafka
的实现 KafkaMessageChannelBinder
,RabbitMQ
的实现 RabbitMessageChannelBinder
以及 RocketMQ
的实现 RocketMQMessageChannelBinder
。
- Binding:包括 Input Binding 和 Output Binding。
Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。
下图是 Spring Cloud Stream 的架构设计。
SpringCloud Stream RocketMQ 不是咱们本次介绍的重点,所以只是抛砖引玉,大家需要了解详情参考:RocketMQ Example(opens new window)
#RocketMQ 部署架构
#1. 本地部署
#1.1 单组节点单副本模式
这种方式风险较大,因为 Broker 只有一个节点,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用, 可以用于本地测试。
#1.2 多组节点(集群)单副本模式
一个集群内全部部署 Master 角色,不部署 Slave 副本,例如2个 Master 或者3个 Master,这种模式的优缺点如下:
- 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
#2. 生产部署
#2.1 多节点(集群)多副本模式-异步复制
每个 Master 配置一个 Slave,有多组 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样;
- 缺点:Master 宕机,磁盘损坏情况下会丢失少量消息。
#2.2 多节点(集群)多副本模式-同步双写
每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。