redis stream 作为消息队列的最详细的命令说明文档

简介

stream 作为消息队列,支持多次消费,重复消费,ack机制,消息异常处理机制。
涉及到以下几个概念,消息流,消费者组,消费者。
在这里插入图片描述
在这里插入图片描述

涉及到以下命令

# 添加消息到流中
XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]
# 创建消费则组(加上MKSTREAM,会校验消息流是否存在,不存在会创建)
XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
# 消费者读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

1. 环境

redis server 7.2.2

> info server
# Server
redis_version:7.2.2

redis可视化工具(可以直接使用命令行) redisInsight 2.44.0

2. 生产消费流程测试

消息队列,涉及到如下几个流程

  1. 发送消息到消息流
  2. 创建消费者组,并进行消费
  3. 正常消费,消息确认 ack
  4. 异常消费,转移消息的归属权 claim

2.1 发送消息到消息流

使用如下命令发送消息

XADD key [NOMKSTREAM] [<MAXLEN | MINID> [= | ~] threshold [LIMIT count]] <* | id> field value [field value ...]

各参数说明

参数名说明
NOMKSTREAM默认情况下,如果消息流不存在,则会创建消息流。
使用该参数,则不会创建,如果不存在则返回 (nil)
[<MAXLEN | MINID> [= | ~] threshold [LIMIT count]]等同于 xtrim 的参数,在添加消息后,会对 stream 裁剪,将先加入的消息剔除。
MAXLEN 表示stream长度不大于 threshold,MINID 表示stream的消息id不小于 threshold;
= 表示精确删除 ~ 表示近似删除;
threshold 表示长度或者id;
limit count 表示最多剔除多少消息
<* | id>* 表示由系统生成消息id,id 表示用用户指定的消息id
field value [field value …]消息采用键值对列表形式存储
# 1.1 执行 lua 脚本,批量添加 10000 个消息
eval "local key = 'test:stream_1';redis.call('del', key); for i=1,ARGV[1],1 do redis.call('xadd', key, i + '-0', 'index' i) end;local res = redis.call('xinfo', 'stream', key); return res[6];" 0 10000
# 1.2 查看 stream 的信息
xinfo stream test:stream_1# 1.3 添加消息,后执行精确修剪。不输入 = | ~,表示使用 = 。会添加一条消息,然后删除消息,使流长度为10
xadd test:stream_1 maxlen 10  '10001-0' index 10001# 1.4 近似删除。stream 中的消息是以基数树的结构存储,一个节点可能存储多个数据,所以当某个节点中存在
# 不能删除的数据时,这个节点就不会删除,因此会导致裁剪后的数据多一些。一个节点会存储 100 个数据。
# 取消 1.3 命令,执行如下命令后,流的长度会变成 101
xadd test:stream_1 maxlen ~ 10 '10001-0' index 10001# 1.5 精确删除,根据 MINID ;添加一个消息,并将流中所有消息id 小于 1714746952323-9 删除
# 取消 1.3, 1.4 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据, 流的长度会变成 1001
xadd test:stream_1 minid = '9001-0' '10001-0' index 10001
# 1.6 近似删除,根据 minid 和 limit count。
# 取消 1.3、1.4、1.5 的命令,执行如下命令。结果是,保留 id >= '9001-0' 的所有数据,并且最多删除 8950 个, 流的长度会变成 1101
# 因为限制 删除 8950 个,所以最后一个节点,计算到一半发现不能删除了,所以最后计算的节点的数据全部保留,故只删除了 8900个
xadd test:stream_1 minid ~ '9001-0' limit 8950 '10001-0' index 10001

为什么 xadd 需要添加 xtrim 的操作呢?因为有些消息,如果闲置的时间太长是要废弃掉的;所以可以加上这个。

xinfo stream test:stream_0 返回的结果字段中
radix-tree-keys:表示有几个id节点,一个id节点 至多会存储 100 个 id
radix-tree-nodes: radix tree 节点数量

2. 创建消费者组,消费消息

XGROUP CREATE key group <id | $> [MKSTREAM] [ENTRIESREAD entries-read]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]

2.1 创建消费者组

MKSTREAM: 表示如果 stream 不存在则创建。如果不加 MKSTREAM参数,且stream不存在,执行的 xgroup create 会报错。
ENTRIESREAD:Redis version 7.0.0 可以添加此参数。如果使用了 ENTRIESREAD entries-read 参数, 设置 entries-read 已消费数量;lag 待消费数量, entries-read + lag 等于总数(包含已删除的消息数)

# id = 0 表示 从头开始消费
# id = 具体id,表示从指定 id 之后开始消费,不包含当前id
# id = $ 消费新消息
# 每一个消费者组都有一个 last_delivered_id 记录发送的最后一个消息id, 相互之间不会影响,比如来了一个新消息加入到队列中,通过 xreadgroup 可以让每一个消费者组都消费
# last_delivered_id = 0-0, 1714581497948-0, stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_0 0
# last_delivered_id = 1714581497948-0
xgroup create test:stream_1 test:stream_1:group_1 1714581497948-0
# last_delivered_id = stream 中的最大的id
xgroup create test:stream_1 test:stream_1:group_2 $# 执行一下命令之后 消费者组的 entries-read = 1, lag = stream.entries-added - entries-read
xgroup create test:stream_1 test:stream_1:group_3 0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_4 1714581525681-0 ENTRIESREAD 1
xgroup create test:stream_1 test:stream_1:group_5 $ ENTRIESREAD 1

2.2 拉取消息消费

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]   [NOACK] STREAMS key [key ...] id [id ...]
  • GROUP group consumer: 与stream 绑定的 消费者组、消费者
  • COUNT count: 查找最大消息数量
  • BLOCK milliseconds: 如果一条消息都没有,阻塞多少时间
  • NOACK: 无需消息确认。相当于在读取的时候就已经确认消息了。
  • STREAMS key [key …] id [id …]
    • id 为 “>” 表示取 stream 中 message_id > consumer_group.last_delivered_id 的消息
    • id 为特定数字,表示 从 padding_list 中取 message_id > id 消息。 使用了具体id, BLOCK 和 NOACK 无效。
# 获取全部未消费的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 STREAMS test:stream_1 >
# 获取至多 10 条消息;若一条消息都没有,等待 20 秒。超时返回 nil
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 10 BLOCK 20000 STREAMS test:stream_1 >
# 获取正在消费中的消息
XREADGROUP GROUP test:stream_1:group_0 test:stream_1:group_0:consumer_0 count 100 STREAMS test:stream_1 0

3. 正常消费,消息确认 ack

XACK key group id [id ...]

key 流名称
group 组名称
id 消息id

# 读取pel列表(pedding entries list: 消费中的列表)的消息的id,并确认
eval "local key = 'test:stream_1';local list = redis.call('xreadgroup','group','test:stream_1:group_0','test:stream_1:group_0:consumer_0','STREAMS',key,0); local entries = list[1][2];local sum = 0; for i=1,#entries, 1 do sum = sum + redis.call('xack', key, 'test:stream_1:group_0', entries[i][1]); end; return sum;" 0

4. 异常消费,转移消息的归属权 claim

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]
  • min-idle-time:最小闲置时间,如果闲置时间小于min-idle-time,则不处理
  • IDLE :设置消息的空闲时间(上次发送时间)。如果未指定 IDLE,则假定 IDLE 为 0,即重置时间计数,因为该消息现在有一个新所有者正在尝试处理它。
  • TIME :这与 IDLE 相同,但不是相对的毫秒数,而是将空闲时间设置为特定的 Unix 时间(以毫秒为单位)。像当于设置下发时间
  • RETRYCOUNT :将重试计数器设置为指定值。每次再次传送消息时,该计数器都会递增。通常XCLAIM不会更改此计数器,该计数器仅在调用 XPENDING 命令时提供给客户端:这样客户端可以检测异常情况,例如在大量传递尝试后由于某种原因从未处理的消息。
  • FORCE:即使某些指定的 ID 尚未在分配给其他客户端的 PEL 中,也会在 PEL 中创建待处理消息条目。但是该消息必须存在于流中,否则不存在的消息的 ID 将被忽略。
  • JUSTID:仅返回成功领取的消息ID数组,不返回实际消息。使用此选项意味着重试计数器不会增加。
# 强制处理id为 11-0 的,闲置时间大于 1 小时的消息;设置闲置时间为 0
xclaim test:stream_1 test:stream_1:group_0 test:stream_1:group_0:consumer_2 3600000 '11-0' IDLE 0 TIME 15 RETRYCOUNT 1 FORCE JUSTID
# 设置下发时间,并返回待处理消息列表
eval "local key = 'test:stream_1';local group = 'test:stream_1:group_0';local consumer = 'test:stream_1:group_0:consumer_2';local id = '11-0';local t = redis.call('time');local time = t[1] * 1000;redis.call('xclaim', key, group, consumer, 3600, id, 'TIME', time, 'RETRYCOUNT', 1, 'FORCE', 'JUSTID');return redis.call('xpending', key, group, '-', '+', 10, consumer);" 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/670809.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶学习第五弹——类与对象(下)——类与对象的收官战

类与对象&#xff08;上&#xff09;&#xff1a;C初阶学习第三弹——类与对象&#xff08;上&#xff09;——初始类与对象-CSDN博客 类与对象&#xff08;中&#xff09;&#xff1a;C初阶学习第四弹——类与对象&#xff08;中&#xff09;——刨析类与对象的核心点-CSDN博…

基于大模型的idea提炼:围绕论文和引用提炼idea之ResearchAgent

前言 对本博客比较熟悉的朋友知道&#xff0c;我司论文项目组正在基于大模型做论文的审稿(含CS英文论文审稿、和金融中文论文审稿)、翻译&#xff0c;且除了审稿翻译之外&#xff0c;我们还将继续做润色/修订、idea提炼(包含论文检索)&#xff0c;是一个大的系统&#xff0c;包…

LangChain 概念篇(喂饭级)

LangChain 介绍 LangChain 是一个用于开发由语言模型驱动的应用程序的框架。 LangChain 框架的设计目标 支持应用程序让其不仅会通过 API 调用语言模型&#xff0c;而且还会数据感知&#xff08;将语言模型连接到其他数据源&#xff09;&#xff0c;Be agentic&#xff08;允…

华为eNSP小型园区网络配置(上)

→跟着大佬学习的b站直通车← 目标1&#xff1a;dhcp分配ip地址 目标2&#xff1a;内网用户访问www.yzy.com sw1 # vlan batch 10 # interface Ethernet0/0/1port link-type accessport default vlan 10 # interface Ethernet0/0/2port link-type trunkport trunk allow-pass…

electron 通信总结

默认开启上下文隔离的情况下 渲染进程调用主进程方法&#xff1a; 主进程 在 main.js 中&#xff0c; 使用 ipcMain.handle&#xff0c;添加要处理的主进程方法 const { ipcMain } require("electron"); 在 electron 中创建 preload.ts 文件&#xff0c;从 ele…

FreeRTOS资源管理

1.以前临界资源的保护方式 有使用过静态局部变量来保护临界资源&#xff0c;也有用队列&#xff0c;信号量&#xff0c;互斥量来保护临界资源。这些都是在多个任务会共同使用临界资源的情况下我们的保护方式。 问题提出&#xff1a;如果有个传感器在读取数据时有严格的时序&a…

奶爸预备 |《伯克毕生发展心理学.从0岁到青少年》 / (美) 劳拉·E. 伯克著——读书笔记

目录 引出第一篇 人的发展理论与研究第1章 历史、理论和研究方法 第二篇 发展的基础第2章 生物基础与环境基础第3章 孕期发育、分娩及新生儿 第三篇 婴儿期和学步期&#xff1a;0~2岁第4章 婴儿期和学步期的身体发育第5章 婴儿期和学步期的认知发展第6章 婴儿期和学步期的情绪与…

【一步一步了解Java系列】:探索Java基本类型转换的秘密

看到这句话的时候证明&#xff1a;此刻你我都在努力~ 加油陌生人~ 个人主页&#xff1a; Gu Gu Study ​​ 专栏&#xff1a;一步一步了解Java 喜欢的一句话&#xff1a; 常常会回顾努力的自己&#xff0c;所以要为自己的努力留下足迹。 如果喜欢能否点个赞支持一下&#…

【LeetCode刷题】739. 每日温度(单调栈)

1. 题目链接2. 题目描述3. 解题方法4. 代码 1. 题目链接 739. 每日温度 2. 题目描述 3. 解题方法 用一个栈st保存每个数的下标&#xff0c;同时创建一个数组res保存结果&#xff0c;初始值都为0。循环遍历题目中的数组temperature。如果temperature[i] > st.top()&#x…

3W 3KVDC 隔离单输出 DC/DC 电源模块——TPG-3W 系列

TPG-3W系列是一款额定功率为3W的隔离产品&#xff0c;国际标准引脚&#xff0c;宽范围工作、温度–40℃ 到 105℃&#xff0c;在此温度范围内都可以稳定输出3W&#xff0c;并且效率非常高&#xff0c;高达88%&#xff0c;同时负载调整率非常低&#xff0c;对于有输出电压精度有…

华为ensp中USG6000V防火墙双机热备VRRP+HRP原理及配置

作者主页&#xff1a;点击&#xff01; ENSP专栏&#xff1a;点击&#xff01; 创作时间&#xff1a;2024年5月6日20点26分 华为防火墙双机热备是一种高可用性解决方案&#xff0c;可以将两台防火墙设备组成一个双机热备组&#xff0c;实现主备切换。当主用防火墙出现故障时&…

Socket编程--TCP连接以及并发处理

文章目录 流程图API阻塞IO/非阻塞IOcode 流程图 网络传输流程&#xff1a; TCP连接&#xff1a; API 客户端&#xff1a; socket: 创建套接字 domain: AF_INET &#xff1a;IPv4 type: SOCK_STREAM(tcp)、SOCK_DGRAM&#xff08;udp&#xff09; protocol: 0 默认协议 返…