Redis Stream消息队列

什么是Stream?

Stream 实际上是一个具有消息发布/订阅功能的组件,也就常说的消息队列。其实这种类似于 broker/consumer(生产者/消费者)的数据结构很常见,比如 RabbitMQ 消息中间件、Celery 消息中间件,以及 Kafka 分布式消息系统等,而 Redis Stream 正是借鉴了 Kafaka 系统。

1) 优点

Strean 除了拥有很高的性能和内存利用率外, 它最大的特点就是提供了消息的持久化存储,以及主从复制功能,从而解决了网络断开、Redis 宕机情况下,消息丢失的问题,即便是重启 Redis,存储的内容也会存在。

2) 流程

Stream 消息队列主要由四部分组成,分别是:消息本身、生产者、消费者和消费组,对于前述三者很好理解,下面了解什么是消费组。

一个 Stream 队列可以拥有多个消费组,每个消费组中又包含了多个消费者,组内消费者之间存在竞争关系。当某个消费者消费了一条消息时,同组消费者,都不会再次消费这条消息。被消费的消息 ID 会被放入等待处理的 Pending_ids 中。每消费完一条信息,消费组的游标就会向前移动一位,组内消费者就继续去争抢下消息。

 Redis Stream 消息队列结构程如下图所示:

Redis Stream结构图

 

下面对上图涉及的专有名词做简单解释:
  • Stream direction:表示数据流,它是一个消息链,将所有的消息都串起来,每个消息都有一个唯一标识 ID 和对应的消息内容(Message content)。
  • Consumer Group :表示消费组,拥有唯一的组名,使用 XGROUP CREATE 命令创建。一个 Stream 消息链上可以有多个消费组,一个消费组内拥有多个消费者,每一个消费者也有一个唯一的 ID 标识。
  • last_delivered_id :表示消费组游标,每个消费组都会有一个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。
  • pending_ids :Redis 官方称为 PEL,表示消费者的状态变量,它记录了当前已经被客户端读取的消息 ID,但是这些消息没有被 ACK(确认字符)。如果客户端没有 ACK,那么这个变量中的消息 ID 会越来越多,一旦被某个消息被 ACK,它就开始减少。
3) ACK 

ACK(Acknowledge character)即确认字符,在数据通信中,接收方传递给发送方的一种传输类控制字符。表示发来的数据已确认接收无误。在 TCP/IP 协议中,如果接收方成功的接收到数据,那么会回复一个 ACK 数据。通常 ACK 信号有自己固定的格式,长度大小,由接收方回复给发送方。

常用命令汇总

Redis Stream命令
命令说明
XADD 添加消息到末尾。
XTRIM对 Stream 流进行修剪,限制长度。
XDEL删除指定的消息。
XLEN获取流包含的元素数量,即消息长度。
XRANGE获取消息列表,会自动过滤已经删除的消息。
XREVRANGE 反向获取消息列表,ID 从大到小。
XREAD以阻塞或非阻塞方式获取消息列表。
XGROUP CREATE创建消费者组。
XREADGROUP GROUP读取消费者组中的消息。
XACK将消息标记为"已处理"。
XGROUP SETID为消费者组设置新的最后递送消息ID。
XGROUP DELCONSUMER删除消费者。
XGROUP DESTROY删除消费者组。
XPENDING显示待处理消息的相关信息。
XCLAIM 转移消息的归属权。
XINFO查看 Stream 流、消费者和消费者组的相关信息。
XINFO GROUPS查看消费者组的信息。
XINFO STREAM 查看 Stream 流信息。
XINFO CONSUMERS key group查看组内消费者流信息。

创建消息ID

当创建一个 Srteam 时, 需要创建消息 ID,该 ID 是唯一、不可重复的,并且只增不减。消息 ID 有两种创建方式,一是系统自动生成,二是自定义创建。

1) 系统自动创建

语法格式如下:

XADD key ID field value [field value ...]

参数说明如下:

  • key :指定队列名称,如果不存就创建;
  • ID :消息 id,我们使用*表示由 redis 生成,可以自定义,但是要自己保证递增性;
  • field value :消息记录。


返回值是毫秒时间戳格式的字符串。比如 1610619132674-2,它表示在该毫秒内产生的第 2 条消息。使用示例:

XADD mystream * username cc 10

2) 自定义ID

自定义 ID 比较简单,但是需要注意的是 ID 的形式必须是 “整数”,并且后面加入消息的 ID 必须大于前面消息的 ID,也就是自定义 ID 也必须遵守递增的规则。示例如下:

XADD mystream1 001 name zhangsan addr hebei

创建消费组

Redis Stream通过XGROUP CREATE指令创建消费组(Consumer Group),在创建时,需要传递起始消息的 ID 用来初始化 last_delivered_id 变量。语法格式如下:

<span style="color:#444444">XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]</span>

参数说明如下:

  • key :指定 Stream 队列名称,若不存在则自动创建。
  • groupname :自定义消费组的名称,不可重复。
  • $ :表示从尾部开始消费,只接受新消息,而当前 Stream 的消息则被忽略。

消费消息

Redis Stream 通过XREADGROUP命令使消费组消费信息,它和XREAD命令一样,都可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PLE(正在处理的消息)结构里,客户端处理完毕后使用 XACK 命令通知 Redis 服务器,本条消息已经处理完毕,该消息的 ID 就会从 PEL 中移除。示意图如下

redis stream

XREADGROUP命令的语法格式如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]  

参数说明如下:

  • group :消费组名称。
  • consumer :消费者名称。
  • count : 要读取的数量。
  • milliseconds : 阻塞时间,以毫秒为单位。
  • key :  键指定的队列名称。
  • ID : 表示消息 ID。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/217381.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openEuler20.03学习01-创建虚拟机

赶个时髦&#xff0c;开始学习openEuler 20.03 (LTS-SP3) 操作系统iso下载地址&#xff1a;https://repo.openeuler.openatom.cn/openEuler-20.03-LTS-SP3/ISO/x86_64/openEuler-20.03-LTS-SP3-x86_64-dvd.iso 公司有现成的vmware环境&#xff0c;创建虚拟机i测试&#xff0c…

qgis添加arcgis的mapserver

左侧浏览器-ArcGIS地图服务器-右键-新建连接 Folder: / 展开-双击图层即可

通过ros系统中websocket中发送sensor_msgs::Image数据给web端显示(三)

通过ros系统中websocket中发送sensor_msgs::Image数据给web端显示(三) 不使用base64编码方式传递 #include <ros/ros.h> #include <signal.h> #include <sensor_msgs/Image.h> #include <message_filters/subscriber.h> #include <message_filter…

leetcode中“辅助栈”类题目和“单调栈”类题目的异同

1 总结 1 栈中元素的特性 2 单调栈存在一次性连续删除多个栈顶的情况&#xff0c;但是普通的栈&#xff0c;一次只pop掉一个栈顶元素 2 LC1209. 删除字符串中的所有相邻重复项 II - 普通辅助栈 class Solution {public String removeDuplicates(String s, int k) {int ns.l…

mysql忘记密码,然后重置

数据库版本8.0.26 只针对以下情况 mysql忘记了密码&#xff0c;但是你navicat之前连接上了 解决方法&#xff1a; 第一步&#xff0c;选中mysql这个数据库&#xff0c;点击新建查询 第二步&#xff1a;重置密码 alter user rootlocalhost IDENTIFIED BY 你的密码; 然后就可…

随机生成字母

目录 css代码 html代码 js代码 css代码 .box {width: 400px;height: 400px;background-color: #797979;margin: 100px auto;padding-top: 100px;}.text{width: 200px;height: 50px;outline: none;border: 5px solid #545454;background-color: #797979;border-radius: 10px;f…

5G NSA注册解析及图标显示方案

5G NSA注册解析及图标显示方案 1. NSA注册流程解析1.1 NSA注册流程1.2 NAS消息信元变化1.3 UE能力信元变化1.3.1 第一次UE能力查询1.3.2 后续UE能力查询1.3.3 UE能力过滤器解析 1.4 UE测量配置1.5 SCG添加消息解析1.6 SCG添加成功1.7 Split Bearer承载的建立1.8 NR协议查询索引…

python动态规划求解最长回文子串

回文是什么&#xff0c;回文是正着读和反着读都是一样的字符叫着回文。  如 ‘aba’&#xff0c;‘aa’&#xff0c;‘b’&#xff0c;这些都是回文 class Solution:def longestPalindrome(self,s: str) -> str:n len(s)dp [[False] * n for _ in range(n)]ans "…

微信公众号快速接入大模型

今天找到一个可以快速将大模型接入公众号的方法&#xff0c;现在跟大家分享一下。 如何让微信公众号接入大模型文案创作能力&#xff0c;实现类似ChatGPT文案创作功能。方法其实很简单&#xff0c;只需打开地址“http://www.botaigc.cn:8900/mpauth”&#xff0c;用微信扫码即可…

应用程序安装异常(-113)

应用程序安装异常(-113) 报错如下&#xff1a;    应用未安装:应用与您的手机不兼容。    应用程序安装异常(-113)    这种情况是说我们的是x86架构&#xff0c;但是你运行的项目支持的是arm架构&#xff0c;所以你需要让自己的项目也支持arm的架构。 方案一 在项目的…

redis运维(二十)redis 的扩展应用 lua(二)

一 redis 的扩展应用 lua redis lua脚本语法 ① 什么是脚本缓存 redis 缓存lua脚本 说明&#xff1a; 重启redis,脚本缓存会丢失 下面讲解 SCRIPT ... 系列 SCRIPT ② LOAD 语法&#xff1a;SCRIPT LOAD lua代码 -->载入一个脚本,只是预加载,不执行思考1&#xff1…

Praat脚本-038 | 批量替换标注TextGrid里的换行符

目录 引题获取脚本运行脚本知识引申关注版权说明 引题 我们在做标注的时候&#xff0c;可能会犯这样一个小错误&#xff0c;就是在标注的内容中间不小心进行了换行&#xff0c;大概会成为这个样子。 这样会有什么问题呢&#xff1f;在提取数据的时候&#xff0c;你会发现&…