Redis队列Stream

1 缘起

项目中处理文件的场景:
将文件处理请求放入队列,
一方面,缓解服务器文件处理压力;
另一方面,可以根据文件大小拆分到不同的队列,提高文件处理效率。
这是Java开发组Leader佳汇提出的文件处理方案,非常实用。
从他那学习到之后,开始搜集Redis Stream相关的知识,整理成文,帮助开发者轻松应对知识交流和考核。

2 Redis Stream

Redis Stream是Redis 5.0.0版本新增的数据结构,想使用Stream需要Redis的最低版本是5.0
Stream是一个高性能、高可靠的消息队列,用于异步消息处理,就是传统的队列功能,完成流量削峰。Redis 5.0之前的版本就有提供队列功能,如列表、有序集合和Pub/Sub均可实现队列功能。既然Redis已经有了队列功能,为什么还要Stream这个数据结构呢?
按照正常的思考过程,新事物的出现,一般是为了解决旧事物的问题,或者,为了防止垄断,当然, 技术圈也遵循这个理论。

2.1 解决的问题

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

2.2 架构

先来看一下Stream的总体架构:
在这里插入图片描述
Redis Stream有生产者、消费者和消费组,其中,
(1)消费组:有多个消费者,消费者之间是竞争关系,消费组中有一个last_delivered_id,消费组中的任意一消费者消费了消息,都会使last_delivered_id移动;
(2)消费者:消费者消费消息后,会产生pending_id,即消费者的状态变量,当消费者消费消息后,使用pending_ids记录被消费的消息,当客户端没有进行消费确认(ACK)时,pending_ids中的数据会一直增加,当客户端进行消息确认(ACK)后, 会移除pending_id。Redis官方称pending_ids为PEL(Pending Entries List),用于确保客户端至少消费一次消息,而不会在网络传输中丢失了处理。

2.3 数据结构

先从源码简单看下Stream相关的数据结构:

/* Stream item ID: a 128 bit number composed of a milliseconds time and* a sequence counter. IDs generated in the same millisecond (or in a past* millisecond if the clock jumped backward) will use the millisecond time* of the latest generated ID and an incremented sequence. */
typedef struct streamID {uint64_t ms;        /* Unix time in milliseconds. */uint64_t seq;       /* Sequence number. */
} streamID;typedef struct stream {rax *rax;               /* The radix tree holding the stream. */uint64_t length;        /* Current number of elements inside this stream. */streamID last_id;       /* Zero if there are yet no items. */streamID first_id;      /* The first non-tombstone entry, zero if empty. */streamID max_deleted_entry_id;  /* The maximal ID that was deleted. */uint64_t entries_added; /* All time count of elements added. */rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

由源码知,stream由Radix树和streamID类型的数据构成,
其中,streamID有两部分组成,ms和seq,ms即毫秒(10位),seq即序列号,
Stream中的每一条消息使用:{毫秒}-{序列号}唯一标识。

3 基础操作

3.1 新建数据:XADD

格式:

XADD key ID field value [field value ...]

参数:

XADD mystream-test * name xiaoyi age 10
XADD mystream-test * name xiaoer age 11

在这里插入图片描述

3.2 查询数据:XRANGE

格式:

XRANGE key start end [COUNT count]

参数:

参数描述
key队列名称
start起始ID标识
end结束ID标识
COUNT查询的条数

3.2.1 查询所有数据

XRANGE mystream-test - +

参数:
-:第一条数据
+:最后一条数据
使用- + 表示拆寻所有数据。
在这里插入图片描述

3.2.2 查询指定条数

XRANGE mystream-test - + COUNT 1

在这里插入图片描述

3.2.3 查询指定范围数据

XRANGE mystream-test 1697335440000-0 1697359922197-0

在这里插入图片描述

3.3 读取数据

格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

参数:

参数描述
COUNT返回的条数
BLOCK用于设置XREAD为阻塞模式,单位毫秒,默认为非阻塞模式。非阻塞模式下,读取完毕(即使没有任何消息)立即返回,而在阻塞模式下,若读取不到内容,则阻塞等待。如果在这个时间内没有新的数据流入,那么输出(nil) (1.05s)

注:使用Block模式,配合 作为 I D ,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式 作为ID,表示读取最新的消息(在非阻塞模式无意义),若没有消息,命令阻塞!等待过程中,其他客户端向队列追加消息,则会立即读取到。

3.3.1 直接读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.3.2 阻塞读取

XREAD BLOCK 4000 STRRAMS mystream-test $

在这里插入图片描述

3.3.3 非阻塞读取

XREAD STREAMS mystream-test 0

在这里插入图片描述

3.4 删除数据

格式:

XDEL key ID [ID ...]

参数:

参数描述
key队列名称
ID数据ID
XDEL mystream-test 1697376922916-0

在这里插入图片描述

3.5 消费组

3.5.1 创建消费组:XGROUP

格式:

XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]

参数:

参数描述
CREATE创建消费组
key队列名称
groupname消费组名称
id接收指定ID之后的消息
$接收所有的消息
参数描述
DESTROY删除消费组
key队列名称
groupname消费组名称
参数描述
DELCONSUMER删除消费组中的消费者
key队列名称
groupname消费组组名称
consumername消费者名称
# 创建接收最新消息的消费组
XGROUP CREATE mystream-test mygroup-1 $
# 创建接收所有消息的消费组
XGROUP CREATE mystream-test mygroup-2 0

在这里插入图片描述

3.5.2 删除消费组

# 删除消费组
XGROUP DESTROY mystream-test mygroup-1
XGROUP DELCONSUMER mystream-test mygroup-2

3.5.3 消费组消费消息:XREADGROUP

格式:

 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
参数描述你
group消费组名称。
consumer消费者名称。
count要读取的数量。
milliseconds阻塞时间,以毫秒为单位。
key键指定的队列名称。
ID表示消息 ID。
XREADGROUP GROUP mygroup-1 myconsumer-1 COUNT 1 BLOCK 100000 STREAMS mystream-test >

在这里插入图片描述

3.6 查看等待确认状态:XPENDING

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

在这里插入图片描述

3.7 消费信息确认:XACK

格式:

XACK key group ID [ID ...]

参数:

参数描述
key队列名称
group消费组名称
ID消息ID
XACK mystream-test mygroup-1 1698558137966-0

在这里插入图片描述

3.8 查询信息:XINFO

格式:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

参数:
查询消费者信息

参数名称
CONSUMERS查询消费者名称
key消费者名称
groupname
查询消费组信息
参数名称
GROUPS查询消费组信息
key消费组名称
查询队列信息
参数名称
STREAM查询队列信息
key队列名称

3.8.1 查询队列信息

XINFO STREAM mystream-test

在这里插入图片描述

3.8.3 查询队列中的消费组

XINFO GROUPS mystream-test

在这里插入图片描述

3.8.4 查询队列消费组中的消费者

XINFO CONSUMERS mystream-test mygroup-1

在这里插入图片描述

4 小结

Stream的出现是为了解决原先队列存在的问题:
(1)Pub/Sub模式无法持久化消息,如果Redis网络异常或者宕机,消息会丢失;
(2)列表和有序集合的方式支持消息持久化,但是,不支持消息多播和分组消费。
Redis Stream一一解决了上面的问题,实现了消息持久化、消息多播和分组消费
Redis Stream既然是队列应用场景和其他队列一样:
(1)异步解耦;
(2)流量削峰;
不过Redis是基于内存的,如果是大量的消息数据建议选择其他消息队列,如RabbitMQ、Kafka、RocketMQ等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/154986.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

创建并启动华为HarmonyOS本地与远程模拟器及远程真机

1.打开设备管理器 2.选择要添加的手机设备,然后点击安装 3.正在下载华为手机模拟器 4.下载完成 5.创建新模拟器 下载系统镜像 点击下一步,创建模拟器 创建成功 启动模拟器 华为模拟器启动成功 6.登陆华为账号并使用远程模拟器 7.使用远程真机

成绩不公开,如何发成绩

亲爱的老师们,有没有在学期中疯狂整理成绩单,又担心成绩私发引起混乱的烦恼?今天就让我们一起探索如何利用各种工具和代码,实现学生自主查询成绩的便捷方式吧! 成绩查询系统简介 成绩查询系统是一款方便学生和老师查询…

如果一定要在C++和JAVA中选择,是C++还是java?

如果一定要在C和JAVA中选择,是C还是java? 计算机专业的同学对这个问题有疑惑的,- 定要看一下这个回答! 上来直接给出最中肯的建议: 如果你是刚刚步入大学的大一时间非常充裕的同学,猪学长强烈建议先学C/C.因为C 非常 最近很多…

unity性能优化__Statistic状态分析

在Unity的Game视图右上角,我们会看到有Stats选项,点击会出现这样的信息 我使用的Unity版本是2019.4.16 一、Audio,顾名思义是声音信息 1:Level:-74.8dB 声音的相对强度或音量。通常,音量级别以分贝(dB&a…

15、SpringCloud -- 延迟消息、异步下单失败处理方案

目录 延迟消息需求理解:思路:代码:发送延迟消息消费延迟消息:1、订单支付状态:2、回补真实库存:3、回补预库存:4、修改本地标识:测试:清除MQ数据:期望结果:实际结果:问题:异步下单失败需求1:代码:发送消息:消费消息:测试:需求2:延迟消息 需求理解: 用户成…

Yusi技术资讯博客wordpress模板

Yusi技术资讯博客wordpress模板,从第一感觉看上去,两栏结构直接将网站的内容展现,以红白灰色调搭配,一种低调协调的风格,喜欢该wordpress主题的朋友可以下载试试。 下载地址:https://bbs.csdn.net/topics/…

IntelliJ IDEA 安装mybaits当前运行sql日志插件在线与离线安装方法

先安装好idear 去网上找找这个安装包下载下来,注意版本要完全一致! 比如: https://www.onlinedown.net/soft/1233409.htm手动安装离线插件方法举例 提前下载好插件的安装包 可以去网上下载这个安装包 搜索离线安装包的资源,包…

每日汇评:通胀数据公布前,欧元复苏失去动力

欧元/美元周一上涨后回落至1.0600; 市场参与者将密切关注欧元区通胀数据; 如果价格跌破1.0580-1.0580区间,卖方可能会采取行动; EUR/USD积聚了多头动能,周一实现了一周以来最高的日内收盘价,超过1.0600。然…

[GKCTF 2021]easycms 禅知cms

一道类似于渗透的题目 记录一下 首先扫描获取 登入界面 admin/12345登入 来到了后台 然后我们开始测试有无漏洞点 1.文件下载 设计 自定义 导出 然后进行抓包 解密后面的内容 发现是绝对路径了 所以这里我们要获取 flag 就/flag即可 L2ZsYWc /admin.php?mui&fdownlo…

云原生-AWS EC2使用、安全性及国内厂商对比

目录 什么是EC2启动一个EC2实例连接一个实例控制台ssh Security groups规则默认安全组与自定义安全组 安全性操作系统安全密钥泄漏部署应用安全元数据造成SSRF漏洞出现时敏感信息泄漏网络设置错误 厂商对比参考 本文通过实操,介绍了EC2的基本使用,并在功…

Jt808应答举例

1.前言 最近客户在集成基于Jt808的产品协议的时候,经常会遇到一些问题,比如没有进行转义,或者转义的时机不对,导致校验码没有进行转义。为了让大家更熟悉Jt808的指令组包,我这里整理了一下转义的步骤。 2.组包 以此…

SkyWalking官方文档-1-概述

概述 SkyWalking是一个开源的可观测平台,用于收集,分析,聚合,以及可视化处理来自服务和云原生框架的数据。SkyWalking提供了一种简单的方法来维护分布式系统的清晰视图,即使是跨云。 它是一种现代APM,专门…