消息队列:字面意思就是存放消息的队列。使用队列的好处在于解耦 。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
Redis消息队列
- 基于List实现消息队列
- 基于PubSub的消息队列
- 基于Stream的消息队列
基于List结构模拟消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null。
优点:
利用Redis存储
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
无法避免消息丢失
不能重复消费
只支持单消费者
基于PubSub的消息队列
PubSub(发布订阅)消息传递模型。消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道PUBLISH channel msg :向一个频道发送消息PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
发布订阅机制存在以下缺点,都是跟丢失数据有关:
- 发布/订阅机制没有基于任何数据类型实现,所以不具备【数据持久化】的能力,也就是发布/订阅机制的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,发布/订阅机制的数据也会全部丢失。
- 发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后不能消费之前的历史消息。
- 当消费端有一定的消息积压时,也就是生产者发送的消息,消费者消费不过来时,如果超过 32M 或者是 60s 内持续保持在 8M 以上,消费端会被强行断开,这个参数是在配置文件中设置的,默认值是
client-output-buffer-limit pubsub 32mb 8mb 60
。所以,发布/订阅机制只适合即时通讯的场景。
基于Stream的消息队列
支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠。大致流程如下
- 消息保序:XADD/XREAD
- 阻塞读取:XREAD block
- 重复消息处理:Stream 在使用 XADD 命令,会自动生成全局唯一 ID;
- 消息可靠性:内部使用 PENDING List 自动保存消息,使用 XPENDING 命令查看消费组已经读取但是未被确认的消息,消费者使用 XACK 确认消息;
- 支持消费组形式消费数据
Stream 消息队列操作命令
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XLEN :查询消息长度;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XDEL : 根据消息 ID 删除消息;
- DEL :删除整个 Stream;
- XRANGE :读取区间消息
- XREADGROUP:按消费组形式读取消息;
- XPENDING 命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息
- XACK 命令用于向消息队列确认消息处理已完成
Stream 可以以使用 XGROUP 创建消费组,创建消费组之后,Stream 可以使用 XREADGROUP 命令让消费组内的消费者读取消息
从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消费组名称
consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
count:本次查询的最大数量
BLOCK milliseconds:当没有消息时最长等待时间
NOACK:无需手动ACK,获取到消息后自动确认
STREAMS key:指定队列名称
ID:获取消息的起始ID:
">":从下一个未消费的消息开始 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始。
PENDING List
pending-list确保消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成,整个流程的执行如下图所示
消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息
#查看一下 group 中各个消费者已读取、但尚未确认的消息个数
XPENDING KEY GROUP#想查看某个消费者具体读取了哪些数据
XPENDING KEY GROUP - + start end count#XACK 命令通知 Streams
XACK KEY GROUP consumerid
消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了,即同一个消费组里的消费者不能消费同一条消息。
比如说,我们执行完刚才的 XREADGROUP 命令后,再执行一次同样的命令,此时读到的就是空值了:
> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)
但是,不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)