第一章 RocketMQ 概述
一、RocketMQ 简介
1.1 消息队列
MQ(Message Queue)是一种提供消息队列服务的中间件,也称消息中间件,是一套提供了海量消息生产、存储、消费全过程 API 的软件系统。
MQ 的作用:
- 限流削峰
- 异步解耦
- 数据收集
常见的 MQ:
MQ | 开发语言 | 单机吞吐量 | Topic | 社区活跃度 |
---|---|---|---|---|
ActiveMQ | Java 语言 | 万级 | - | 低 |
RabbitMQ | ErLang 语言 | 万级 | - | 高 |
Kafka | Scala/Java 语言 | 十万级 | 百级 Topic 会影响吞吐量 | 高 |
RocketMQ | Java 语言 | 十万级 | 千级 Topic 会影响吞吐量 | 高 |
MQ 协议:
协议 | 描述 |
---|---|
JMS | JMS(Java Message Service),是 Java 平台上有关 MOM(Message Oriented Middleware 面向消息的中间件) 的技术规范,它便于消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接受消息的接口,简化企业应用的开发。 ActiveMQ 是该协议的典型实现。 |
STOMP | STOMP(Streaming Text Orientated Message Protocol),是一种 MOM 的简单文本协议。STOMP 提供一个可互操作式的连接格式,允许客户端与任意 STOMP 消息代理进行交互。 ActiveMQ 是该协议的典型实现,RabbitMQ 通过插件可以支持该协议。 |
AMQP | AMQP(Advanced Message Queuing Protocol),是一个提供统一消息服务的应用层标准,是应用层协议的一个开放标准,是一种 MOM 设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的现在。 RabbitMQ是该协议的典型实现。 |
MQTT | MQTT(Message Queuing Telemetry Transport),是 IBM 开发的一个即时通讯协议,是一种二进制协议,主要用于服务器和低功耗 loT 设备间的通信。该协议支持所有平台,几乎可以把所有物联网物品和外部链接起来,被用来做传感器和制动器的通信协议。 RabbitMq 通过插件可以支持该协议。 |
1.2 RocketMQ 简介
Apache RocketMQ 是一款由阿里巴巴提供的开源的分布式消息中间件,是一个分布式消息和流式处理平台,具有低时延、高性能和高可靠性、万亿级容量和灵活扩展的特点,适用于构建海量消息堆积和异步解耦功能的应用系统。
Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。
1.3 RocketMQ 基本概念
主题(Topic):
主题是 RocketMQ 中消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。主题通过 TopicName 来做唯一标识和区分。
消息(Message):
消息是指消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。 生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
消息类型(MessageType):
RocketMQ 中按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类 型有普通消息、顺序消息、事务消息和定时/延时消息。
RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题 Topic 只允许发送一种消息类型的消息, 这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认开启。
消息队列(MessageQueue):
消息队列是 RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。RocketMQ 的所有主 题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过 QueueId 来做唯一标识和区分。
消息视图(MessageView):
消息视图是 Apache RocketMQ 面向开发视角提供的一种消息只读接口。通过消息视图可以读取消息内部的多个属性和负载信息,但是不能对消息本身做任何修改。
消息标签(MessageTag):
消息标签 是 RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分,消费者通过订阅特定的标签来实现细粒度过滤。
消息位点(MessageQueueOffset):
消息是按到 达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset):
一条消息被 某个消费者消费完成后不会立即从队列中删除,RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey):
消息索引 是 RocketMQ 提供的面向消息的索引属性,通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer):
生产者 是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
事务检查器(TransactionChecker):
事务 检查器 RocketMQ 中生产者用来执行本地事务检查和异常事务恢复的监听器,事务检查器应该通过业务侧数据的状态来检查和判断事务消息的状态。
事务状态(TransactionResolution)
Rock etMQ 中事务消息发送过程中,事务提交的状态标识,服务端通过事务状态控制事务消息是否应该提交和投递。事务状态包括事务提交、事务回滚和事务未决。
消费者(Consumer):
消 费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
消费者分组(ConsumerGroup):
消 费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
消费结果(ConsumeResult):
RocketMQ 中 PushConsumer 消费监听器处理消息完成后返回的处理结果,用来标识本次消息是否正确处理。消费结果包含消费成功和消费失败。
订阅关系(Subscription):
订阅关系是Apache RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。
消息过滤:
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。
重置消费位点:
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。
消息轨迹:
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积:
生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
事务消息:
事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。
定时/延时消息:
定时/延时消息是 RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。
顺序消息:
顺序消息是 RocketMQ 提供的一种高级消息类型,支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。
二、RocketMQ 系统架构
2.1 Producer
Producer 是消息生产者,负责生产消息。Producer 通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
RocketMQ 中的消息生产者都是以生产者组(Producer Group)的形式出现的,生产者组是同一类生产者的集合,这类 Producer 发送相同 Topic 类型的消息。
2.2 Consumer
Consumer 是消息消费者,负责消费消息。一个消息消费者会从 Broker 服务器中获取到消息,并对消息进行相关业务处理。
RocketMQ 中的消息消费者都是以消费者组(Consumer Group)的形式出现的,消费者组是同一类消费者的集合,这类 Consumer 消费的是同一个 Topic 类型的消息。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。
消费者组中 Consumer 的数量应该小于等于订阅 Topic 的 Queue 数量,如果超出 Queue 数量,则多出的 Consumer 将不能消费消息。
不过,一个 Topic 类型的消息可以被多个消费者组同时消费。
2.3 Name Server
NameServer 是一个 Broker 与 Topic 路由的注册中心,支持 Broker 的动态注册与发现。
NameServer 主要包括两个功能:
- Broker 管理:接受 Broker 集群的注册信息并且保存下来作为路由信息的基本数据,提供心跳检测机制,检查 Broker 是否还存活。
- 路由信息管理:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer 和 Conumser 通过 NameServer 可以获取整个 Broker 集群的路由信息,从而进行消息的投递和消费。
路由注册:
NameServer 通常也是以集群的方式部署,不过 NameServer 是无状态的,即 NameServer 集群中的各个节点间是无差异的,各节点间相互不进行信息通讯。在 Broker 节点启动时,轮询 NameServer 列表,与每个 NameServer 节点建立长连接,发起注册请求。在 NameServer 内部维护着一个 Broker 列表,用来动态存储 Broker 的信息。
Broker 节点为了证明自己是活着的,为了维护与 NameServer 间的长连接,会将最新的信息以心跳包的方式上报给 NameServer,每50秒发送一次心跳。心跳包中包含 BrokerIld、Broker 地址、Broker 名称、Broker 所属集群名称等等。NameServer 在接收到心跳包后,会更新心跳时间戳,记录这个 Broker 的最新存活时间。
路由剔除:
由于 Broker 关机、宕机或网络抖动等原因,NameServer 没有收到 Broker 的心跳,NameServer 可能会将其从 Broker 列表中剔除。
NameServer 中有一个定时任务,每隔10秒就会扫描一次 Broker 表,查看每一个 Broker 的最新心跳时间戳距离当前时间是否超过120秒,如果超过,则会判定 Broker 失效,然后将其从 Broker 列表中剔除。
路由发现:
RocketMQ 的路由发现采用的是 Pull 模型。当 Topic 路由信息出现变化时,NameServer 不会主动推送给客户端,而是客户端定时拉取主题最新的路由。默认客户端每30秒会拉取一次最新的路由。
客户端 NameServer 选择策略:
客户端在配置时必须要写上 NameServer集群的地址,那么客户端到底连接的是哪个 NameServer 节点呢?客户端首先会生成一个随机数,然后再与 NameServer 节点数量取模,此时得到的就是所要连接的节点索引,然后就会进行连接。如果连接失败,则会采用 round-robin 策略,逐个尝试着去连接其它节点。
首先采用的是随即策略,失败后采用轮询策略。
2.4 Broker
Broker 充当着消息中转角色,负责存储消息、转发消息。
Broker 在 RocketMQ 系统中负责接收并存储从生产者发送来的消息,同时为消费者的拉取请求作准备。Broker 同时也存储着消息相关的元数据,包括消费者组消费进度偏移 offset、主题、队列等。
Broker 功能模块:
- Remoting Module:整个 Broker 的实体,负责处理来自 clients 端的请求。
- Client Manager:客户端管理器。负责接收、解析客户端(Producer/Consumer)请求,管理客户端。例如,维护 Consumer 的 Topic 订阅信息
- Store Service:存储服务。提供方便简单的 API 接口,处理消息存储到物理硬盘和消息查询功能。
- HA Service:高可用服务,提供 Master Broker 和 Slave Broker 之间的数据同步功能。
- Index Service:索引服务。根据特定的 Message key,对投递到 Broker 的消息进行索引服务,同时也提供根据 Message Key 对消息进行快速查询的功能。
集群部署:
为了增强 Broker 性能与吞吐量,Broker 一般都是以集群形式出现的。各集群节点中可能存放着相同 Topic 的不同 Queue。每个 Broker 集群节点可以进行横向扩展,即将 Broker 节点再建为一个 HA 集群,解决单点数据丢失问题。
Broker 节点集群是一个主从集群,即集群中具有 Master 与 Slave 两种角色。
- Master 负责处理读写操作请求,而 Slave 仅负责读操作请求(Master 宕机后,Slave也负责读写操作)。
- 一个 Master 可以包含多个 Slave,但一个 Slave 只能隶属于一个 Master。
- Master 与 Slave 的对应关系是通过指定相同的 BrokerName、不同的 Brokerld 来确定的。Brokerld 为0表示 Master,非0表示 Slave。
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
2.5 工作流程
- 启动 NameServer,NameServer 启动后开始监听端口,等待 Broker、Producer、Consumer 连接。
- 启动 Broker 时,Broker 会与所有的 NameServer 建立并保持长连接,然后每30秒向 NameServer 定时发送心跳包。
- 收发消息前,可以先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker上,当然,在创建 Topic 时也会将 Topic 与 Broker 的关系写入到 NameServer 中。不过,这步是可选的,也可以在发送消息时自动创建 Topic。Topic 的创建模式:
- 集群模式:该模式下创建的 Topic 在该集群中,所有 Broker 中的 Queue 数量是相同的。
- Broker 模式::该模式下创建的 Topic 在该集群中,每个 Broker 中的 Queue 数量是可以不同。
- 自动创建 Topic 时,默认采用的是 Broker 模式,会为每个 Broker 默认创建4个 Queue。
- Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从NameServer 中获取路由信息,即当前发送的 Topic 的 Queue 与 Broker 的地址(IP+Port)的映射关系。然后根据算法策略从队列中选择一个 Queue,与队列所在的 Broker 建立长连接从而向 Broker 发消息。当然,在获取到路由信息后,Producer 会首先将路由信息缓存到本地,再每30秒从 NameServer 更新一次路由信息。
- Consumer 跟 Producer 类似,跟其中一台 NameServer 建立长连接,获取其所订阅 Topic 的路由信息,然后根据算法策略从路由信息中获取到其所要消费的 Queue,然后直接跟 Broker 建立长连接,开始消费其中的消息。Consumer 在获取到路由信息后,同样也会每30秒从 NameServer 更新一次路由信息。不过不同于 Producer 的是,Consumer 还会向 Broker 发送心跳,以确保 Broker 的存活状态。
三、RocketMQ 入门
3.1 RocketMQ 安装
(1)RocketMQ 下载
进入 RocketMQ 官网
点击 Download 进入下载界面,如下。选择对应的版本下载即可,如 rocketmq-all-4.8.0-bin-release.zip
将下载完成的文件上传到 Linux 服务器中,并使用以下命令完成解压
unzip rocketmq-all-4.8.0-bin-release.zip
解压后目录如下:
(2)修改配置
修改 RocketMQ 配置,需要修改 runserver.sh 文件和 runbroker.sh 文件中的初始内存。
runserver.sh 文件
修改为:
-server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
runbroker.sh 文件
修改为:
-server -Xms256m -Xmx256m -Xmn128m"
(3)启动 RocketMQ
启动 NameServer
# 启动 nameserver
nohup sh bin/mqnamesrv &
启动结果:
在 RocketMQ 根目录下会生成一个 nohup.out 文件,查看该文件如下:
出现 The Name Server boot success,则说明 NameServer 启动成功。可以使用如下命令查看NameServer 运行日志:
# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
日志如下:
启动 Broker
# 启动 broker
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &# 查看日志
tail -f ~/logs/rocketmqlogs/proxy.log
启动结果:
查看 broker 运行日志:
查看 proxy.log 日志:
(4)测试消息发送接收
发送消息:
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
发送结果:
接收消息:
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
接收结果:
(5)关闭 RocketMQ
先关闭 Broker
sh bin/mqshutdown broker
关闭结果:
再关闭 NameServer
sh bin/mqshutdown namesrv
关闭结果:
3.2 RocketMQ 控制台
(1)下载 RocketMQ 控制台
RocketMQ 原来的控制台为 rocketmq-console,现在已经改为 rocketmq-externals 并单独列为一个项目,现在取名:rocketmq-dashboard。
rocketmq-externals 地址:https://github.com/apache/rocketmq-externals。
下载 rocketmq-dashboard 可以直接在官网下载
rocketmq-dashboard 是一个 springboot 项目,下载后解压,进入项目配置文件 application.properties,修改端口号和 NameServer 地址,如下:
修改完成之后,进入到项目个根路径,使用 cmd 命令行进行项目的打包,打包命令如下:
mvn clean package -Dmaven.test.skip=true
打包执行结果:
打包完成,生成如下 jar 包:
使用 java -jar 命令执行 jar 包,启动项目,结果如下:
启动成功之后,本地访问 localhost:9999,结果如下:
注意:
安装 RocketMQ 之后,需要开放端口 9876,10911,10912,10909
9876:NameServer 的端口
10911:Broker 对外服务的监听端口
10912:haService 中使用
10909:主要用于 slave 同步 master
四、RocketMQ 集群
4.1 数据复制与刷盘
(1)复制策略
复制策略是 Broker 的 Master 与 Slave 间的数据同步方式,分为同步复制与异步复制:
- 同步复制:消息写入 master 后,master 会等待 slave 同步数据成功后才向 producer 返回成功 ACK。
- 异步复制:消息写入 master 后,master 立即向 producer 返回成功 ACK,无需等待 slave 同步数据成功。
(2)刷盘策略
刷盘策略指的是 broker 中消息的落盘方式,即消息发送到 broker 内存后消息持久化到磁盘的方式。分为同步刷盘与异步刷盘:
- 同步刷盘:当消息持久化到 broker 的磁盘后才算是消息写入成功。
- 异步刷盘:当消息写入到 broker 的内存后即表示消息写入成功,无需等待消息持久化到磁盘。
4.2 Broker 集群模式
单 Master
只有一个 Broker,这种方式也只能是在测试时使用,生产环境下不能使用,因为存在单点问题。
多 Master
Broker 集群仅由多个 Master 构成,不存在Slave。同一 Topic 的各个 Queue 会平均分布在各个 Master 节点上。
- 优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高。
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅(不可消费),消息实时性会受到影响。
多 Master 多 Slave-异步复制
Broker 集群由多个 Master 构成,每个 Master 又配置了多个 Slave (在配置了 RAID 磁盘阵列的情况下,一个 Master 一般配置一个 Slave 即可) 。Master 与 Slave 的关系是主备关系,即 Master 负责处理消息的读写请求,而 Slave 仅负责消息的备份与 Master 宕机后的角色切换。
异步复制即前面所讲的复制策略中的异步复制策略,即消息写入 Master 成功后,Master 立即向 producer 返回成功 ACK,无需等待 Slave 同步数据成功。
该模式的最大特点之一是,当 Master 宕机后 Slave 能够自动切换为 Master 。不过由于 Slave 从 Master 的同步具有短暂的延迟(毫秒级),所以当 Master 宕机后,这种异步复制方式可能会存在少量消息的丢失问题。
多 Master 多 Slave-同步双写
该模式是多 Master 多 Slave 模式的同步复制实现。所谓同步双写,指的是消息写入 Master 成功后,Master 会等待 Slave 同步数据成功后才向 producer 返回成功 ACK,即 Master 与 Slave 都要写入成功后才会返回成功 ACK,也即双写。
该模式与异步复制模式相比,优点是消息的安全性更高,不存在消息丢失的情况。但单个消息的 RT 略高,从而导致性能要略低(大约低10%)
该模式存在一个大的问题:对于目前的版本,Master 宕机后,Slave不能自动切换到 Master。
4.3 RocketMQ 集群配置
(1)RocketMQ 集群配置文件
- 2m-2s-async:两个 Master,两个 Slave 的异步集群配置
- 2m-2s-sync:两个 Master,两个 Slave 的同步集群配置
- 2m-noslave:两个 Master,没有 Slave 的异步集群配置
以 2m-2s-async 集群模式为例,进入到 2m-2s-async 目录,如下:
- broker-a.properties:Master a
- broker-a-s.properties:Master a 的 Slave
- broker-b.properties:Master b
- broker-b-s.properties:Master b 的 Slave
进入到 Master 配置文件 broker-a.properties,如下:
进入到 Slave 配置文件 broker-a-s.properties,如下
- brokerClusterName=DefaultCluster:指定整个 broker 集群的名称,或者说是 RocketMQ 集群的名称。
- brokerName=broker-a:指定 master-slave 集群的名称。一个 RocketMQ 集群可以包含多个 master-slave 集群。
- brokerId=0:master 的 brokerId 为0,slave 的 brokerId 为1。
- deleteWhen=04:指定删除消息存储过期文件的时间为凌晨4点。
- fileReservedTime=48:指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除。
- brokerRole=ASYNC_MASTER:指定当前 broker 的复制策略, ASYNC_MASTER 为异步复制。
- flushDiskType=ASYNC_FLUSH:指定刷盘策略,ASYNC_FLUSH 为异步刷盘。
(2)添加配置
服务器1上需要配置如下配置文件:
- broker-a.properties:作为 Master1
- broker-b-s.properties:作为 Slave1
服务器2上需要添加如下配置文件:
- broker-b.properties:作为 Master2
- broker-a-s.properties:作为 Slave2
Master 配置文件需要添加如下配置:
- namesrvAddr:指定 Name server 的地址,如 namesrvAddr=192.168.xxx.xxx:9876;192.168. xxx.xxx:9876
Slave 配置文件需要添加如下配置:
- namesrvAddr:指定 Name server 的地址,如 namesrvAddr=192.168.xxx.xxx:9876;192.168. xxx.xxx:9876
- listenPort=11911:指定 Broker 对外提供服务的端口,即 Broker 与 producer 与 consumer 通信的端口。默认10911,由于当前主机同时充当着 master1 与 slave2,而前面的 master1 使用的是默认端口。这里需要将这两个端口加以区分,以区分出 master1 与 s1ave2
- 指定消息存储相关的路径:默认路径为 ~/store 目录。由于当前主机同时充当着 master1与slave2,master1 使用的是默认路径,这里就需要再为 slave 指定一个不同路径 ~/store-s
- storePathRootDir=~/store-s
- storePathCommitLog=~/store-s/commitlog
- storePathConsumeQueue=~/store-s/consumequeue
- storePathIndex=~/store-s /index
- storeCheckPoint=~/store-s/checkpoint
- abortFile=~/store-s/abort
(3)启动服务器
启动 NameServer:分别启动两台服务器上的 NameServer,启动命令如下:
# 启动 nameserver
nohup sh bin/mqnamesrv &# 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log# 查看根目录下的 nohup.out 文件
tail -f nohup.out
启动结果:
启动 Master:分别启动两台服务器上的 broker master,需要指定配置文件,命令如下:
# 服务器1
# 启动 broker master
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties &# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log# 服务器2
# 启动 broker master
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b.properties &# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动 Slave:分别启动两台服务器上的 broker slave,需要指定配置文件,命令如下:
# 服务器1
# 启动 broker master
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties &# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log# 服务器2
# 启动 broker master
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a-s.properties &# 查看日志
tail -f ~/logs/rocketmqlogs/broker.log
启动完成后,通过 jps 命令可以查看运行进程
第二章 RocketMQ 工作原理
一、消息的生产
1.1 消息生产过程
Producer 可以将消息写入到某 Broker 中的某 Queue 中,其经历了如下过程:
- Producer 发送消息之前,会先向 NameServer 发出获取消息 Topic 的路由信息的请求,NameServer 返回该 Topic 的路由表及 Broker 列表。
- Producer 根据代码中指定的 Queue 选择策略,从 Queue 列表中选出一个队列,用于后续存储消息。
- Produer 对消息做一些特殊处理,例如消息本身超过4M,则会对其进行压缩。
- Producer 向选择出的 Queue 所在的 Broker 发出 RPC 请求,将消息发送到选择出的 Queue。
1.2 Queue 选择算法
对于无序消息,其 Queue 选择算法,也称为消息投递算法,常见的有两种:
- 轮询算法:默认选择算法。该算法保证了每个 Queue 中可以均匀的获取到消息。
- 最小投递延迟算法:该算法会统计每次消息投递的时间延迟,然后根据统计出的结果将消息投递到时间延迟最小的 Queue。如果延迟相同,则采用轮询算法投递。
二、消息的存储
2.1 store 目录
(1)store 目录的文件
RocketMQ 中的消息存储在本地文件系统中,这些相关文件默认在当前用户主目录下的 store 目录中。
文件说明:
- abort:该文件在 Broker 启动后会自动创建,正常关闭 Broker,该文件会自动消失。若在没有启动 Broker 的情况下,发现这个文件是存在的,则说明之前 Broker 的关闭是非正常关闭。
- checkpoint:其中存储着 commitlog、consumequeue、index 文件的最后刷盘时间戳。
- commitlog:存放 commitlog 文件,消息就是写在这个文件中。
- config:存放 Broker 运行期间的一些配置数据。
- consumequeue:存放 consumequeue 文件,队列就存放在这个目录中。consumequeue 文件是 commitlog 的索引文件,可以根据 consumequeue 定位到具体的消息。
- index:存放消息索引文件 indexFile。
- lock:存放运行期间使用到的全局资源锁。
(2)commitlog 文件
commitlog 目录中存放着很多的 mappedFile 文件,当前 Broker 中的所有消息都是落盘到这些 mappedFile 文件中的。mappedFile 文件大小为1G(小于等于1G),文件名由20位十进制数构成,表示当前文件的第一条消息的起始位移偏移量。
需要注意的是,一个 Broker 中仅包含一个 commitlog 目录,所有的 mappedFile 文件都是存放在该目录中的。即无论当前 Broker 中存放着多少 Topic 的消息,这些消息都是被顺序写入到了 mappedFile 文件中的,这些消息在 Broker 中存放时并没有被按照 Topic 进行分类存放。
消息单元:
mappedFile 文件内部消息存放结构示意图如下:
mappedFile 文件内容由一个个的消息单元构成,每个消息单元中包含一下内容:
- MsgLen:消息总长度
- PhysicalOffset:消息的物理位置
- Body:消息体内容
- BodyLength:消息体长度
- Topic:消息主题
- TopicLength:Topic 长度
- BornHost:消息生产者
- BornTimestamp:消息发送时间戳
- Queueld:消息所在的队列
- QueueOffset:消息在Queue中存储的偏移量
(3)consumequeue 文件
为了提高效率,会为每个 Topic 在 -/store/consumequeue 中创建一个目录,目录名为 Topic 名称。在该 Topic 目录下,会再为每个该 Topic 的 Queue 建立一个目录,目录名为 queueld。每个目录中存放着若干 consumequeue文件,consumequeue 文件是 commitlog 的索引文件,可以根据 consumequeue 定位到具体的消息。
consumequeue 文件名也由20位数字构成,表示当前文件的第一个索引条目的起始位移偏移量。与 mappedFile 文件名不同的是,其后续文件名是固定的,因为 consumequeue 文件大小是固定不变的。
索引条目
每个 consumequeue 文件可以包含30w个索引条目,每个索引条目包含了三个消息重要属性
- 消息在 mappedFile 文件中的偏移量 CommitLog Offset,占8个字节
- 消息长度,占4个字节
- 消息 Tag 的 hashcode 值,占8个字节
这三个属性占20个字节,所以每个文件的大小是固定的30w *20字节。
(4)文件读写
消息写入:
一条消息进入到 Broker 后经历了以下几个过程才最终被持久化:
- Broker 根据 queueld,获取到该消息对应索引条目要在 consumequeue 目录中的写入偏移量,即 QueueOffset。
- 将 queueld、queueOffset 等数据,与消息一起封装为消息单元
- 将消息单元写入到 commitlog
- 形成消息索引条目
- 将消息索引条目分发到相应的 consumerqueue
消息拉取:
当 Consumer 来拉取消息时会经历以下几个步骤:
- Consumer 获取到其要消费消息所在 Queue 的消费偏移量 offset,计算出其要消费消息的消息 offset。
- Consumer 向 Broker 发送拉取请求,其中会包含其要拉取消息的 Queue、消息 offset 及消息 Tag。
- Broker 计算在该 consumequeue 中的 queueOffset。
- 从该 queueOffset 处开始向后查找第一个指定 Tag 的索引条目。
- 解析该索引条目的前8个字节,即可定位到该消息在 commitlog 中的 commitlog offset。
- 从对应 commitlog offset 中读取消息单元,并发送给 Consumer。
RocketMQ 读写性能:
RocketMQ 对文件的读写操作是通过 mmap 零拷贝进行的,将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率。
consumequeue 中的数据是顺序存放的,还引入了 PageCache 的预读取机制,使得对 consumequeue 文件的读取几乎接近于内存读取,即使在有消息堆积情况下也不会影响性能。RocketMQ 中可能会影响性能的是对 commitlog 文件的读取。因为对 commitlog 文件来说,读取消息时会产生大量的随机访问,而随机访问会严重影响性能。不过,如果选择合适的系统 IO 调度算法,比如设置调度算法为 Deadline(采用 SSD 固态硬盘的话),随机读的性能也会有所提升。
(5)indexFile
indexFile 文件位于 store/index 目录下:
除了通过通常的指定 Topic 进行消息消费外,RocketMQ 还提供了根据 key 进行消息查询的功能,该查询就是通过 indexFile 进行索引实现的快速查询。这个 indexFile 中的索引数据是在包含了 key 的消息被发送到 Broker 时写入的。如果消息中没有包含 key,则不会写入。
indexFile 文件结构:
每个 Broker 中会包含一组 indexFile,每个 indexFile 都是以一个时间戳命名的(indexFile 被创建时的时间戳)。
使用时间戳命名好处:
根据业务 key 进行查询时,查询条件除了 key 之外,还需要指定一个要查询的时间戳,表示要查询不大于该时间戳的最新的消息,即查询指定时间戳之前存储的最新消息。这个时间戳文件名可以简化查询,提高查询效率。
每个 indexFile 文件由三部分构成
- indexHeader
- slots 槽位
- indexes 索引数据。
每个 indexFile 文件中包含500w个 slot 槽,而每个 slot 槽又可能会挂载很多的 index 索引单元。
indexHeader 固定40个字节,其中存放着如下数据:
- beginTimestamp:该 indexFile 中第一条消息的存储时间。
- endTimestamp:该 indexFile 中最后一条消息存储时间。
- beginPhyoffset:该 indexFile 中第一条消息在 commitlog 中的偏移量 commitlog offset。
- endPhyoffset:该 indexFile 中最后一条消息在 commitlog 中的偏移量 commitlog offset。
- hashSlotCount:已经填充有 index 的 slot 数量。
- indexCount:该 indexFile 中包含的索引个数。
indexFile 中最复杂的是 Slots 与 Indexes 间的关系。在实际存储时,Indexes 是在 Slots 后面的,但为了便于理解,将它们的关系展示为如下形式:
key 的 hash 值 % 500w 的结果即为 slot 槽位,然后将该 slot 值修改为该 index 索引单元的 indexNo,根据这个 indexNo 可以计算出该 index 单元在 indexFile 中的位置。
该取模结果的重复率是很高的,为了解决该问题,在每个 index 索引单元中增加了 preIndexNo,用于指定该 slot 中当前 index 索引单元的前一个 index 索引单元。而 slot 中始终存放的是其下最新的 index 索引单元的 indexNo,这样的话,只要找到了slot 就可以找到其最新的 index 索引单元,而通过这个 index 索引单元就可以找到其之前的所有 index 索引单元。
index 索引单元默写20个字节,其结构如下:
index 索引单元包含以下四个属性:
- keyHash:消息中指定的业务 key 的 hash 值。
- phyOffset:当前 key 对应的消息在 commitlog 中的偏移量 commitlog offset。
- timeDiff:当前 key 对应消息的存储时间与当前 indexFile 创建时间的时间差。
- preIndexNo:当前 slot 下当前 index 索引单元的前一个 index 索引单元的 indexNo。
三、消息的消费
3.1 消息消费
(1)消息消费类型
消费者从 Broker 中获取消息的方式有两种:拉取方式 pull 和推动方式 push。
pull:拉取方式
- Consumer 主动从 Broker 中拉取消息,主动权由 Consumer 控制。一旦获取了批量消息,就会启动消费过程,该方式实时性较弱。
push:推动方式
- 该模式下 Broker 收到数据后会主动推送给 Consumer,该方式实时较高。
- 该消费类型是典型的发布-订阅模式,即 Consumer 向其关联的 Queue 注册了监听器,一旦发现有新的消息到来就会触发回调的执行,回调方法是 Consumer 去 Queue 中拉取消息。而这些都是基于 Consumer 与 Broker 间的长连接的。长连接的维护是需要消耗系统资源的。
(2)消费模式
消费者组对于消息消费的模式又分为两种:集群消费 Clustering 和广播消费 Broadcasting。
广播模式
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收同一个 Topic 的全量消息,即每条消息都会被发送到 Consumer Group 中的每个 Consumer。
(3)消息进度保存
广播模式:
消费进度保存在 consumer 端。因为广播模式下 consumer group 中每个 consumer 都会消费所有消息,但它们的消费进度是不同,所以 consumer 各自保存各自的消费进度。
集群模式:
消费进度保存在 broker 中。consumer group 中的所有 consumer 共同消费同一个 Topic 中的消息,同一条消息只会被消费一次,消费进度会参与到了消费的负载均衡中,故消费进度是需要共享的。
3.2 Rebalance 机制
Rebalance 即再均衡,指将一个 Topic 下的多个 Queue 在同一个 Consumer Group 中的多个 Consumer 间进行重新分配的过程。
当消费者所订阅的 Queue 数量发生变化,或消费者组中消费者的数量发生变化,就会产生 Rebalance。
- Queue 数量发生变化的场景:
- Broker 扩容或缩容
- Broker 升级运维
- Broker 与 NameServer 间的网络异常
- 消费者数量发生变化的场景:
- Consumer Group 扩容或缩容
- Consumer 升级运维
- Consumer 与 NameServer 间网络异常
Rebalance 机制的本意是为了提升消息的并行消费能力。例如,一个 Topic 下5个队列,在只有1个消费者的情况下,这个消费者将负责消费这5个队列的消息。如果此时我们增加一个消费者,那么就可以给其中一个消费者分配2个队列,给另一个分配3个队列,从而提升消息的并行消费能力。
Rebalance 限制:
由于一个队列最多分配给一个消费者,因此当某个消费者组下的消费者实例数量大于队列的数量时,多余的消费者实例将分配不到任何队列。
Rebalance 危害:
- 消费暂停:在只有一个 Consumer 时,其负责消费所有队列,在新增了一个 Consumer 后会触发 Rebalance 的发生。此时原 Consumer 就需要暂停部分队列的消费,等到这些队列分配给新的 Consumer 后,这些暂停消费的队列才能继续被消费。
- 消费重复:Consumer 在消费新分配给自己的队列时,必须接着之前 Consumer 提交的消费进度的 offset 继续消费。然而默认情况下,offset 是异步提交的,这个异步性导致提交到 Broker的 offset 与 Consumer 实际消费的消息并不一致。这个不一致的差值就是可能会重复消息的消息。
- 消费突刺:由于 Rebalance 可能导致重复消费,如果需要重复消费的消息过多,或者因为 Rebalance 暂停时间过长从而导致积压了部分消息,那么有可能会导致在 Rebalance 结束之后瞬间需要消费很多消息。
Rebalance 过程:
在 Broker 中维护着多个 Map 集合,这些集合中动态存放着当前 Topic 中 Queue 的信息、 Consumer Group 中 Consumer 实例的信息。一旦发现消费者所订阅的 Queue 数量发生变化,或消费者组中消费者的数量发生变化,立即向 Consumer Group 中的每个实例发出 Rebalance 通知。
- TopicConfigManager:key 是 topic 名称,vaue 是TopicConfig,TopicConfig 中维护着该 Topic 中所有 Queue 的数据。
- ConsumerManager:key 是 Consumser Group Id,value 是 ConsumerGroupInfo,ConsumerGroupInfo 中维护着该 Group 中所有 Consumer 实例数据。|
- ConsumerOffsetManager:key 为 Topic 与订阅该 Topic 的 Group 的组合,value 是一个内层 Map,内层 Map 的 key 为 Queueld,内层 Map 的 value 为该 Queue 的消费进度 offset。
Consumer 实例在接收到通知后会采用 Queue 分配算法自己获取到相应的 Queue,即由 Consumer 实例自主进行 Rebalance。
3.3 Queue 分配算法
(1)Queue 分配算法
一个 Topic 中的 Queue 只能由 Consumer Group 中的一个 Consumer 进行消费,那么它们间的配对关系是根据算法策略确定的,常见的有四种策略:
- 平均分配策略
- 环形平均策略
- 一致性 hash 策略
- 同机房策略
(2)平均分配策略
该算法是要根据 avg= QueueCount / ConsumerCount 的计算结果进行分配的。如果能够整除,则按顺序将 avg 个 Queue 逐个分配 Consumer,如果不能整除,则将多余出的 Queue 按照 Consumer 顺序逐个分配。
(3)环形平均策略
环形平均算法是指,根据消费者的顺序,依次在由 queue 队列组成的环形图中逐个分配。
(4)一致性 hash 策略
该算法会将 consumer 的 hash 值作为 Node 节点存放到 hash 环上,然后将 queue 的 hash 值也放到 hash 环上,通过顺时针方向,距离 queue 最近的那个 consumer 就是该 queue 要分配的 consumer。
(5)同机房策略
该算法会根据 queue 的部署机房位置和 consumer 的位置,过滤出当前 consumer 相同机房的 queue。然后按照平均分配策略或环形平均策略对同机房 queue 进行分配。如果没有同机房 queue,则按照平均分配策略或环形平均策略对所有 queue 进行分配。
(6)至少一次原则
RocketMQ 有一个原则:每条消息必须要被成功消费一次。
Consumer 在消费完消息后会向其消费进度记录器提交其消费消息的 offset,offset 被成功记录到记录器中,那么这条消费就被成功消费了。
3.4 订阅关系一致性
订阅关系的一致性指的是,同一个消费者组(Group ID 相同)下所有 Consumer 实例所订阅的 Topic与 Tag 及对消息的处理逻辑必须完全一致。否则,消息消费的逻辑就会混乱,甚至导致消息丢失。
3.5 offset 管理
消费进度 offset 是用来记录每个 Queue 的不同消费组的消费进度的。根据消费进度记录器的不同,可以分为两种模式:本地模式和沅程模式
(1)offset 本地模式
当消费模式为广播消息时,offset 使用本地模式存储。因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集。
Consumer 在广播消费模式下 offset 相关数据以 json 的形式持久化到 Consumer 本地磁盘文件中,默认文件路径为当前用户主目录下的 .rocketmqg_ offsets/${clientld}/${group} /Offsets.json。
- ${clientld} 为当前消费者 id,默认为 ip@DEFAULT
- ${group} 为消费者组名称
(2)offset 远程管理模式
当消费模式为集群消费时,offset 使用远程模式管理。因为所有 Cosnumer 实例对消息采用的是均衡消费,所有 Consumer 共享 Queue 的消费进度。
Consumer 在集群消费模式下offset相关数据以 json 的形式持久化到 Broker 磁盘文件中,文件路径为当前用户主目录下的 store/config/consumerOffset.json。
Broker 启动时会加载这个文件,并写入到一个双层 Map。外层 map 的 key 为 topic@group,value 为内层 map。内层 map 的 key 为 queueld,value 为 offset。当发生 Rebalance 时,新的 Consumer 会从该 Map 中获取到相应的数据来继续消费。
(3)offset 用途
消费者要消费的第一条消息是通过 consumer.setConsumeFromWhere() 方法指定起始位置的。
在消费者启动后,其要消费的第一条消息的起始位置常用的有三种,这三种位置可以通过枚举类型常量设置,这个枚举类型为 ConsumeFromWhere。
当消费完一批消息后,Consumer 会提交其消费进度 offset 给 Broker,Broker 在收到消费进度后会将其更新到那个双层 Map 及 consumerOffset.json 文件中,然后向该 Consumer 进行 ACK,而 ACK 内容中包含三项数据:当前消费队列的最小 offset (minOffset)、最大 offset (maxOffset)、及下次消费的起始 offset (nextBeginOffset)。
(4)重试队列
当 RocketMQ 对消息的消费出现异常时,会将发生异常的消息的 offset 提交到 Broker 中的重试队列。系统在发生消息消费异常时会为当前的 topic@group 创建一个重试队列,该队列以 %RETRY% 开头,到达重试时间后进行消费重试。
(5)offset 的同步提交与异步提交
集群消费模式下,Consumer 消费完消息后会向 Broker 提交消费进度 offset,其提交方式分为两种:
同步提交:消费者在消费完一批消息后会向 Broker 提交这些消息的 offset,然后等待 Broker 的成功响应。若在等待超时之前收到了成功响应,则继续读取下一批消息进行消费。若没有收到响应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响了消费者的吞吐量。
异步提交:消费者在消费完一批消息后向 Broker 提交 offset,但无需等待 Broker 的成功响应,可以继续读取并消费下一批消息。这种方式增加了消费者的吞吐量。但需要注意,Broker 在收到提交的 offset 后,还是会向消费者进行响应的。|
3.6 消费幂等
(1)消息幂等
当出现消费者对某条消息重复消费的情况时,重复消费的结果与消费一次的结果是相同的,并且多次消费并未对业务系统产生任何负面影响,那么这个消费过程就是消费幂等的。
在互联网应用中,尤其在网络不稳定的情况下,消息很有可能会出现重复发送或重复消费。如果重复的消息可能会影响业务处理,那么就应该对消息做幂等处理。
(2)消息重复的场景分析
- 发送时消息重复:当一条消息已被成功发送到 Broker 并完成持久化,此时出现了网络闪断,从而导致 Broker 对 Producer 应答失败。如果此时 Producer 意识到消息发送失败并尝试再次发送消息,此时 Broker 中就可能会出现两条内容相同并且 Message ID 也相同的消息,那么后续 Constmer 就一定会消费两次该消息。
- 消费时消息重复:消息已投递到 Consumer 并完成业务处理,当 Consumer 给 Broker 反馈应答时网络闪断,Broker 没有接收到消费成功响应。为了保证消息至少被消费一次的原则,Broker 将在网络恢复后再次尝试投递之前已被处理过的消息。此时消费者就会收到与之前处理过的内容相同、Message ID 也相同的消息。
- Rebalance 时消息重复:当 Consumer Group 中的 Consumer 数量发生变化时,或其订阅的 Topic 的 Queue 数量发生变化时,会触发 Rebalance,此时 Consumer 可能会收到曾经被消费过的消息。
(3)幂等解决方案
幂等解决方案的设计中涉及到两项要素:
- 幂等令牌:是生产者和消费者两者中的既定协议,通常指具备唯一业务标识的字符串。
- 唯一性处理:服务端通过采用一定的算法策略,保证同一个业务逻辑不会被重复执行成功多次。
对于常见的系统,幂等性操作的通用性解决方案是:
- 首先通过缓存去重。在缓存中如果已经存在了某幂等令牌,则说明本次操作是重复性操作,若缓存没有命中,则进入下一步。
- 在唯一性处理之前,先在数据库中查询幂等令牌作为索引的数据是否存在。若存在,则说明本次操作为重复性操作,若不存在,则进入下一步。
- 在同一事务中完成三项操作:唯一性处理后,将幂等令牌写入到缓存,并将幂等令牌作为唯一索引的数据写入到 DB 中。
(4)消息幂等的实现
消费幂等的解决方案很简单:为消息指定不会重复的唯一标识。因为 Message ID 有可能出现重复的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置。
以支付场景为例,可以将消息的 Key 设置为订单号,作为幂等处理的依据。具体代码示例如下:
Message message = new Message();
message.setKey("OrderId_0001");
SendResult sendResult = producer.send(message);
消费者收到消息时可以根据消息的 Key 即订单号来实现消费幂等:
consumer.registerMessageListener(new MessageListenerConcurrently(){@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context){for(MessageExt msg: msgs){String key = msg.getKeys();// 具体根据key做幂等处理的逻辑...}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
3.7 消息堆积与消费延迟
消息处理流程中,如果 Consumer 的消费速度跟不上 Producer 的发送速度,MQ 中未处理的消息会越来越多,这部分消息就被称为堆积消息。消息出现堆积进而会造成消息的消费延迟。
以下场景需要重点关注消息堆积和消费延迟问题:
- 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复。
- 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消费延迟也无法接受。
消息堆积产生原因:
Consumer 使用长轮询 Pull 模式消费消息时,分为以下两个阶段:
- 消息拉取:Consumer 通过长轮询 Pull 模式批量拉取的方式从服务端获取消息,将拉取到的消息缓存到本地缓冲队列中。对于拉取式消费,在内网环境下会有很高的吞吐量,所以这一阶段一般不会成为消息堆积的瓶颈。
- 消息消费:Consumer 将本地缓存的消息提交到消费线程中,使用业务消费逻辑对消息进行处理,处理完毕后获取到一个结果。此时 Consumer 的消费能力就完全依赖于消息的消费耗时和消费并发度了,如果由于业务处理逻辑复杂等原因,导致处理单条消息的耗时较长,则整体的消息吞吐量肯定不会高,此时就会导致 Consumer 本地缓冲队列达到上限,停止从服务端拉取消息。
单机线程数据
一般情况下,消费者端的消费并发度由单节点线程数和节点数量共同决定,其值为单节点线程数*节点数量。不过,通常需要优先调整单节点的线程数,若单机硬件资源达到了上限,则需要通过横向扩展来提高消费并发度。
对于一台主机中线程池中线程数的设置需要谨慎,不能盲目直接调大线程数,设置过大的线程数反而会带来大量的线程切换的开销。理想环境下单节点的最优线程数计算模型为:C*(T1 +T2)/T1。
- C:CPU 内核数
- T1:CPU 内部逻辑计算耗时
- T2:外部 IO 操作耗时
3.8 消息的清理
消息是被顺序存储在 commitlog 文件的,且消息大小不定长,所以消息的清理是不可能以消息为单位进行清理的,而是以 commitlog 文件为单位进行清理的。否则会急剧下降清理效率,并实现逻辑复杂。
commitlog 文件存在一个过期时间,默认为72小时。除了用户手动清理外,在以下情况下也会被自动清理,无论文件中的消息是否被消费过:
- 文件过期,且到达清理时间点(默认为凌晨4点,通过参数 deleteWhen = 4 设置 )后,自动清理过期文件。
- 文件过期,且磁盘空间占用率已达过期清理警戒线(默认75%)后,无论是否达到清理时间点,都会自动清理过期文件。
- 磁盘占用率达到清理警戒线(默认85%)后,开始按照设定好的规则清理文件,无论是否过期。默认会从最老的文件开始清理。
- 磁盘占用率达到系统危险警戒线(默认90%)后,Broker 将拒绝消息写入。
第三章 RocketMQ 应用
一、生产者
1.1 普通消息
普通消息发送类型:
- 同步发送消息
- 异步发送消息
- 单向发送消息
同步发送消息:
同步发送消息是指 Producer 发出一条消息后,会在收到 MQ 返回的 ACK 之后才发下一条消息。该方式的消息可靠性最高,但消息发送效率太低。
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");producer.setNamesrvAddr("192.168.136.10:9876");producer.start();for (int i = 0; i < 10; i++) {byte[] msgBody = ("This is SyncProducer -- " + i).getBytes();Message message = new Message("SyncProducer-TopicA", "SyncProducer-tagA", msgBody);SendResult sendResult = producer.send(message);System.out.println(sendResult);}producer.shutdown();
}
异步发送消息:
异步发送消息是指 Producer 发出消息后无需等待 MQ 返回 ACK,直接发送下一条消息。该方式的消息可靠性可以得到保障,消息发送效率也可以。
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");producer.setNamesrvAddr("192.168.136.10:9876");producer.start();for (int i = 0; i < 10; i++) {byte[] msgBody = ("This is AsyncProducer -- " + i).getBytes();Message message = new Message("AsyncProducer-TopicA", "AsyncProducer-tagA", msgBody);producer.send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println(sendResult);}@Overridepublic void onException(Throwable throwable) {throwable.printStackTrace();}});}// 注意:添加延时关闭,避免异步发送时producer被关闭TimeUnit.SECONDS.sleep(3);producer.shutdown();
}
单项发送消息:
单向发送消息是指 Producer 仅负责发送消息,不等待、不处理 MQ 的 ACK,该发送方式时 MQ 也不返回 ACK。该方式的消息发送效率最高,但消息可靠性较差。
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("SingWayProducer");producer.setNamesrvAddr("192.168.136.10:9876");producer.start();for (int i = 0; i < 10; i++) {byte[] msgBody = ("This is SingWayProducer -- " + i).getBytes();Message message = new Message("SingWayProducer-TopicA", "SingWayProducer-tagA", msgBody);producer.sendOneway(message);}producer.shutdown();
}
1.2 顺序消息
顺序消息指的是,严格按照消息的发送顺序进行消费的消息。
默认情况下生产者会把消息以 Round Robin 轮询方式发送到不同的 Queue 分区队列,消费者会从多个 Queue 上拉取消息,这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个 Queue 中,消费时也只从这个 Queue 上拉取消息,就严格保证了消息的顺序性。
有序性分类:
根据有序范围的不同,RocketMQ 可以严格地保证两种消息的有序性:分区有序与全局有序。
全局有序:当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,称为全局有序。
分区有序:如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,则称为分区有序。
示例:
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("OrderedProducer");producer.setNamesrvAddr("192.168.136.10:9876");producer.start();for (int i = 0; i < 10; i++) {Integer orderId = i;byte[] msgBody = ("This is OrderedProducer -- " + i).getBytes();Message message = new Message("OrderedProducer-TopicA", "OrderedProducer-tagD", msgBody);SendResult sendResult = producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object obj) {Integer id = (Integer) obj;int index = id % list.size();return list.get(index);}}, orderId);System.out.println(sendResult);}producer.shutdown();
}
1.3 延时消息
当消息写入到 Broker 后,在指定的时长后才可被消费处理的消息,称为延时消息。
采用 RocketMQ 的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。
延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ 服务端的 Messagestoreconfig 类中的如下变量中:
- 延迟等级是从1开始计数的,即指定的延时等级为3,则表示延迟时长为10s。
- 如果需要自定义的延时等级,可以通过在 broker 加载的配置中新增如下配置,配置文件在 RocketMQ 安装目录下的 conf 目录中。
延时消息实现原理
Producer 将消息发送到 Broker 后,Broker 会首先将消息写入到 commitlog 文件,然后需要将其分发到相应的 consumequeue。在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发,若有则需要经历以下过程:
- 修改消息的 Topic 为 SCHEDULE_TOPIC_XXXX
- 根据延时等级,在 consumequeue 目录中 SCHEDULE TOPIC_XXXX 主题下创建出相应的 queueld 目录与 consumequeue 文件(如果没有这些目录与文件的话)。
- 修改消息索引单元内容。索引单元中的 Message Tag HashCode 部分原本存放的是消息的 Tag 的 Hash 值。现修改为消息的投递时间,投递时间是指该消息被重新修改为原 Topic 后再次被写入到 commitlog 中的时间。投递时间=消息存储时间+延时等级时间。消息存储时间指的是消息被发送到 Broker 时的时间戳。
- 将消息索引写入到 SCHEDULE_TOPIC_XXXX 主题下相应的 consumequeue 中。
重新投递延时消息
Broker 内部有一个延迟消息服务类 ScheuleMessageService,其会消费 SCHEDULE_TOPIC_XXXX 中的消息,即按照每条消息的投递时间,将延时消息投递到目标 Topic 中。在投递之前会从 commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消息。然后再次将消息投递到目标 Topic 中。
延迟消息服务类将延迟消息再次发送给了 commitlog,并再次形成新的消息索引条目,分发到相应 Queue。
示例:
public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("DelayProducer");producer.setNamesrvAddr("192.168.136.10:9876");producer.start();for (int i = 0; i < 10; i++) {Integer orderId = i;byte[] msgBody = ("This is DelayProducer -- " + i).getBytes();Message message = new Message("DelayProducer-TopicA", "DelayProducer-tagD", msgBody);// 设置消息延时等级,3对应10smessage.setDelayTimeLevel(3);SendResult sendResult = producer.send(message);System.out.println(sendResult);}producer.shutdown();
}
1.4 事务消息
(1)相关概念
分布式事务:
对于分布式事务,通俗地说就是,一次操作由若干分支操作组成,这些分支操作分属不同应用,分布在不同服务器上。分布式事务需要保证这些分支操作要么全部成功,要么全部失败。分布式事务与普通事务一样,就是为了保证操作结果的一致性。.
事务消息:
RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致 XA 是一种分布式事务解决方案,一种分布式事务处理模式。
半事务消息:
暂不能投递的消息,发送方已经成功地将消息发送到了 Broker,但是 Broker 未收到最终确认指令,此时该消息被标记成“暂不能投递”状态,即不能被消费者看到,处于该种状态下的消息即半事务消息。
本地事务状态:
Producer 回调操作执行的结果为本地事务状态,其会发送给 TC,而 TC 会再发送给 TM,TM 会根据 TC 发送来的本地事务状态来决定全局事务确认指令。
消息回查:
消息回查,即重新查询本地事务的执行状态。
RocketMQ中 的消息回查设置:
关于消息回查,有三个常见的属性设置,在 broker 加载的配置文件中设置:
- transactionTimeout=20,指定 TM 在20秒内应将最终确认状态发送给 TC,否则引发消息回查,默认为60秒
- transactionCheckMax=5,指定最多回查5次,超过后将丢弃消息并记录错误日志,默认15次。.
- transactionCheckInterval=10,指定设置的多次消息回查的时间间隔为10秒,默认为60秒。
(2)XA 模式
XA 协议:
XA(Unix Transaction)是一种分布式事务解决方案,一种分布式事务处理模式,基于 XA 协议。XA 协议由 Tuxedo (Transaction for Unix has been Extended for Distributed Operation,分布式操作扩展之后的 Unix 事务系统)首先提出的,并交给 X/Open 组织,作为资源管理器与事务管理器的接口标准。
XA 模式中有三个重要组件:TC、TM、RM。
- TC:Transaction Coordinator,事务协调者。维护全局和分支事务的状态,驱动全局事务提交或回滚。
- TM:Transaction Manager,事务管理器。定义全局事务的范围:开始、提交或回滚全局事务,它际是全局事务的发起者。
- RM:Resource Manager,资源管理器。管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
XA 模式架构:
XA模式是一个典型的2PC,其执行原理如下:
- TM 向 TC 发起指令,开启一个全局事务。
- 根据业务要求,各个 RM 会逐个向 TC 注册分支事务,然后 TC 会逐个向 RM 发出预执行指令。
- 各个 RM 在接收到指令后会在进行本地事务预执行。
- RM 将预执行结果 Report 给 TC,这个结果可能是成功,也可能是失败。
- TC 在接收到各个 RM 的 Report 后会将汇总结果上报给 TM,根据汇总结果 TM 会向 TC 发出确认指令。
- 若所有结果都是成功响应,则向 TC 发送 Global Commit 指令。
- 只要有结果是失败响应,则向 TC 发送 Global Rollback 指令。
- TC 在接收到指令后再次向 RM 发送确认指令。
注意:
- 事务消息不支持延时消息
- 事务消息需要做好幂等检查
示例:
1.5 批量消息
二、消费者
2.1 消费者示例
public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncConsumer");consumer.setNamesrvAddr("192.168.136.10:9876");// 指定消费位置consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅topic和tagconsumer.subscribe("SyncProducer-TopicA", "*");// 注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : msgs) {System.out.println(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();
}
-
DefaultLitePullConsumer:pull 消费者
-
DefaultMQPushConsumer:push 消费者