C# redis通过stream实现消息队列以及ack机制

redis实现

查看redis版本

redis需要>5.0
Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;
  • 消息遍历;
  • 消息的阻塞和非阻塞读;
  • Consumer Groups 消费组;
  • ACK 确认机制。
  • 支持多播。

本次主要实现基本的消息发送接受确认,消费组有需要的可以看参考的文章

info

在这里插入图片描述

插入消息

XADD streamName id field value [field value ...]
# 消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。
# 消息 ID 由两部分组成:当前毫秒内的时间戳; 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令
XADD queue01 * name wjl age 25 gender male

在这里插入图片描述

读取消息

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
XREAD COUNT 1 BLOCK 0 STREAMS queue01  0-0
# 指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# 如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。

在这里插入图片描述
这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行 XREAD COUNT 1 BLOCK 0 STREAMS queue01 0-0指令的时候又会重新读取到。

创建消费组

# Stream 通过 XGROUP CREATE 指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化 last_delivered_id 变量。
# 随便再插入一些数据
XADD queue01 * name zhangsan age 52 gender male
XADD queue01 * name lisi age 34 gender male
XADD queue01 * name xiaomei age 24 gender famale
# 创建消费组的指令
# 格式
XGROUP CREATE stream group start_id
# stream:指定队列的名字;
# group:指定消费组名字;
# start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0 从第一条开始读取, $ 表示从最后一条向后开始读取,只接收新消息。
# MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。# 新建group01消费组
XGROUP CREATE queue01 group01 0-0 MKSTREAM

读取群组消息

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >
# >:命令的最后参数 >,表示从尚未被消费的消息开始读取;
# BLOCK 0:表示阻塞读取,要是大于0就是等待多少毫秒

在这里插入图片描述

如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

在这里插入图片描述

查看已读未确认消息

XREADGROUP GROUP groupName consumerName
XPENDING queue01 group01 

在这里插入图片描述

1 # 未读消息条数
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最小
1696822787364-0 #所有消费者读取的消息最小和最大 ID;这是最大
consumer01
1

查看消费者读取了哪些数据

XPENDING queue01 group01 - + 10 consumer01

在这里插入图片描述

确认消息

XACK key group-key ID [ID ...]XACK queue01 group01 1696822787364-0

在这里插入图片描述
再次查询未读消息

XPENDING queue01 group01 
XREADGROUP GROUP group01 consumer01 COUNT 1 BLOCK 0 STREAMS queue01 >

在这里插入图片描述
在这里插入图片描述

C#操作redis实现

使用FreeRedis类库,熟悉了上面的流程,直接上代码

using FreeRedis;namespace RedisMQStu01
{internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");//创建群组,如果数据存在则不需要执行了,第一次需要执行await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");var result = await cli.XReadGroupAsync(groupName, consumerName,1, 0, noack: false, ids);//查看已读未确认的消息var unReadResults = await cli.XPendingAsync(queueName, groupName);await Console.Out.WriteLineAsync($"未读消息条数为:{unReadResults.count}");foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName,groupName, entry.id);}}await Console.Out.WriteLineAsync("完成");}}
}

上面的代码是生产者和消费者在一块,不满足生产环境要求,因为生产环境大多需要分开,生产者只负责生产,消费者只负责消费

生产者

using FreeRedis;namespace RedisMQProductor01
{internal class Program{/// <summary>/// redis消息队列的生产者/// </summary>/// <param name="args"></param>/// <returns></returns>async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字//添加数据await cli.XAddAsync(queueName, "name", "wjl", "age", 25, "gender", "male");await cli.XAddAsync(queueName, "name", "zhangsan", "age", 52, "gender", "male");await cli.XAddAsync(queueName, "name", "lisi", "age", 34, "gender", "male");await cli.XAddAsync(queueName, "name", "xiaomei", "age", 24, "gender", "famale");await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

消费者

using FreeRedis;namespace RedisMQConsumer01
{/// <summary>/// redis消息队列的消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idawait Console.Out.WriteAsync($"\t");foreach (var field in entry.fieldValues){await Console.Out.WriteAsync($"\t{field.ToString()}");}await Console.Out.WriteLineAsync();//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

先启动生产者在启动消费者查看效果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

方法改善

改善之后可以先启动消费者然后等待生产者投递数据即可

消费者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu02
{/// <summary>/// 备份策略消费者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var groupName = "group01";//读取队列的群组的名字var consumerName = "consumer01";//消费者的名字try{var streamInfo = cli.XInfoStream(queueName);}catch{await cli.XAddAsync(queueName, "student", "");}//如果数据存在则不需要执行了,第一次需要执行var info = await cli.XInfoGroupsAsync(queueName);if (info == null || info.Length < 1){//创建群组await cli.XGroupCreateAsync(queueName, groupName, id: "0-0", MkStream: true);}//读取群组消息var ids = new Dictionary<string, string>();ids.Add("queue01", ">");//block的值是0表示无限等待var result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",//自己写数据库链接字符串IsAutoCloseConnection = true,DbType = DbType.SqlServer};using SqlSugarClient db = new SqlSugarClient(connectionConfig);//初始化表格db.CodeFirst.InitTables(typeof(Student));while (true){if (result != null && result.Length > 0){foreach (var item in result){await Console.Out.WriteLineAsync(item.key);//群组名字foreach (var entry in item.entries){await Console.Out.WriteLineAsync($"\t{entry.id}");//消息队列idfor (int i = 0; i < entry.fieldValues.Length; i++){var field = entry.fieldValues[i];if (field.ToString() == "student"){var studentListJson = entry.fieldValues[i + 1]?.ToString() ?? "";if (string.IsNullOrWhiteSpace(studentListJson)){continue;}var students = JsonConvert.DeserializeObject<List<Student>>(studentListJson);await db.Storageable(students).ExecuteCommandAsync();}}//确认消息await cli.XAckAsync(queueName, groupName, entry.id);}}await Console.Out.WriteLineAsync("===============本次处理完毕===============");}//继续等待result = await cli.XReadGroupAsync(groupName, consumerName, 1, 0, noack: false, ids);}}}
}

生产者

using FreeRedis;
using Newtonsoft.Json;
using SqlSugar;namespace CelueStu01
{/// <summary>/// 备份策略生产者/// </summary>internal class Program{async static Task Main(string[] args){var cli = new RedisClient("127.0.0.1:6379,password=,defaultDatabase=99");var queueName = "queue01";//队列的名字var perProcessNumber = 1000;//每次处理的数据条数int totalPage = 0;//总页码数ConnectionConfig connectionConfig = new ConnectionConfig(){ConnectionString = "",IsAutoCloseConnection = true,DbType = DbType.SqlServer};using (SqlSugarClient db = new SqlSugarClient(connectionConfig)){//初始化表格db.CodeFirst.InitTables(typeof(Student));do{int count = await db.Queryable<Student>().CountAsync();totalPage = count % perProcessNumber == 0 ? count / perProcessNumber : (count / perProcessNumber) + 1;var students = await db.Queryable<Student>().ToPageListAsync(totalPage, perProcessNumber);//批量发送,redis频繁写入会报rdb错误,限制一下写入频率await cli.XAddAsync(queueName, "student", JsonConvert.SerializeObject(students));List<int> deleteStudents = students.Select(p => p.Id).ToList();if (deleteStudents.Any()){//批量删除await db.Deleteable<Student>().Where(p => deleteStudents.Contains(p.Id)).ExecuteCommandAsync();}totalPage -= 1;//Thread.Sleep(2000);} while (totalPage > 0);}await Console.Out.WriteLineAsync("生产者添加数据完成");}}
}

参考

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/132587.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

React Native从0到1开发一款App

先贴上项目地址&#xff0c;有需要的大佬可以去github看看&#xff1a; WinWang/RNOpenEye: React Native(0.72)版本开眼OpenEye项目 (github.com) React Native&#xff08;0.72&#xff09;版本OpenEye项目&#xff0c;主要用来熟悉并上手RN项目的开发&#xff0c;是Flutte…

nodejs+vue+elementui实验室预约管理系统

简单的说 Node.js 就是运行在服务端的 JavaScript。 前端技术&#xff1a;nodejsvueelementui 前端&#xff1a;HTML5,CSS3、JavaScript、VUE实验室如何适应新的时代和新的潮流,开展有效的信息服务工作,完成时代赋予的新使命?本文就这一问题谈谈几点粗浅的看法.扩大业务范围,更…

栈的运行算法

一&#xff0c;顺序栈的静态分配 二&#xff0c;顺序栈的动态分配 #include<stdio.h> #include<stdlib.h> #define initsize 5 #define incresize 5typedef struct Sqstack{int *base;int *top;int stacksize; }Sqstack;void InitStack(Sqstack *s){(*s).base(int…

MNIST字符识别(C++)

构建网络 采用官方示例的的lenet网络 训练 相关文件都已编译好&#xff0c;下载后执行命令即可 .\caffe-bin.exe train --solver .\lenet_solver.prototxt 识别 #include <caffe/caffe.hpp>#include <opencv2/core/core.hpp> #include <opencv2/highgui/hi…

echarts画电压线

ChartLibhttp://chartlib.datains.cn/detail?idx0R9f3tOqMExamples - Apache EChartsApache ECharts&#xff0c;一款基于JavaScript的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。https://echarts.ap…

对验证码的识别爆破

声明&#xff1a;该系列文章首发于公众号&#xff1a;Y1X1n安全&#xff0c;转载请注明出处&#xff01;本公众号所分享内容仅用于每一个爱好者之间的技术讨论及教育目的&#xff0c;所有渗透及工具的使用都需获取授权&#xff0c;禁止用于违法途径&#xff0c;否则需自行承担&…

BN体系理解——类封装复现

from pathlib import Path from typing import Optionalimport torch import torch.nn as nn from torch import Tensorclass BN(nn.Module):def __init__(self,num_features,momentum0.1,eps1e-8):##num_features是通道数"""初始化方法:param num_features:特征…

为Yolov7环境安装Cuba匹配的Pytorch

1. 查看Cuba版本 方法一 nvidia-smi 找到CUDA Version 方法二 Nvidia Control Panel > 系统信息 > 组件 > 2. 安装Cuba匹配版本的PyTorch https://pytorch.org/get-started/locally/这里使用conda安装 conda install pytorch torchvision torchaudio pytorch-cu…

LeetCode416 分割等和子集

题目&#xff1a; 、 分析&#xff1a; 因为分割的子数组&#xff0c;不连续&#xff1b;所以双指针、栈&#xff0c;一般不适用&#xff0c;分析起来很像是DP问题。 思路&#xff1a; https://www.imooc.com/article/300277 代码&#xff1a; //TODO 这题有难度

如何在C++项目中用C#运行程序调试C++ DLL

问题描述 在C#项目中调用C DLL时报错或者运行结果不符&#xff0c;此时需要运行C#项目并在C中加入断点进行调试 项目准备 项目一&#xff1a;C#项目&#xff08;该项目调用C DLL&#xff09;项目二&#xff1a;C项目&#xff08;生成C DLL&#xff09; 这两个项目不需要在同…

【web实现右侧弹窗】JS+CSS如何实现右侧缓慢弹窗动态效果『附完整源码下载』

文章目录 写在前面涉及知识点页面效果1、页面DOM创建1.1创建底层操作dom节点1.2 创建存放弹窗dom节点 2、页面联动功能实现&#xff08;关闭与弹出&#xff09;2.1 点击非右侧区域实现关闭2.2 点击叉叉及关闭按钮实现关闭功能 3、完整源码包下载3.1百度网盘3.2 123云盘3.3邮箱留…

基于若依ruoyi-nbcio支持flowable流程增加自定义业务表单(二)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 之前讲了自定义业务表单&#xff0c;现在讲如何与流程进行关联 1、后端部分 WfCustomFormMapper.xml &…