golang工程组件——redigo使用(redis协议,基本命令,管道,事务,发布订阅,stream)

redisgo

redis 与 client 之间采用请求回应模式,一个请求包对应一个回应包;但是也有例外,pub/sub 模 式下,client 发送 subscribe 命令并收到回应包后,之后被动接收 redis 的发布包;所以若需要使 用 pub/sub 模式,那么需要在 client 下创建额外一条连接与 redis 交互;

在这里插入图片描述

Redis 协议图

在这里插入图片描述

redis 协议采用特殊字符( \r\n )来分割有意义的数据,redis 使用的是二进制安全字符串(用长 度来标识字符串的长度),所以 redis 可以用来存储二进制数据(比如经过压缩算法加密的数 据);

例如

set key val
# "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\nval\r\n"

命令执行

Do(commandName string, args ...interface{}) (reply interface{}, err error)

redisgo参数转换

Redis TypeGo Type
errorredis.Error
integerint64
simple stringstring
bulk string[]byte or nil if value not present
array[]interface{} or nil if value not present

参数转换处理

  • 单个string[]byte直接传递;
  • intfloat需要转成 string
  • 也可用redis.Args类型来处理,提供了方法 Add 添加单个元素;提供了方法 AddFlat添加了 对 map[interface{}]interface{}以及结构体的处理;

返回值转换处理

redisgo返回值有多种

  • 接收单个返回值处理
  • 接收多个不同类型的返回值处理

redis.Values+redis.Scan

  • 接收单个结构体

redis.Values+redis.ScanStruct

  • 接收多个结构体

redis.Values+redis.ScanSlice

案例

链接认证

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}}

set,get,list操作

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if false {conn.Do("set", "test_hello", 1)rpy, err := redis.Int(conn.Do("get", "test_hello"))if err != nil {panic(err)}fmt.Println(rpy, reflect.TypeOf(rpy))}if false {args := redis.Args{"test_list"}.Add("test1").Add("test2")conn.Do("lpush", args...)res, _ := redis.Strings(conn.Do("lrange", "test_list", 0, -1))fmt.Println(res, reflect.TypeOf(res))}if false {//conn.Do("del", "test_list")// 阻塞popvals, err := redis.Strings(conn.Do("brpop", redis.Args{}.Add("test_list").Add(20)...))if err != redis.ErrNil {fmt.Println(vals, err)}}if false {// 按map形式返回vals, err := redis.StringMap(conn.Do("brpop", redis.Args{}.Add("test_list").Add(20)...))if err != redis.ErrNil {fmt.Println(vals, err)}}
}

变量读取

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}// 取元素到对应的变量上if false { // redis.Values + redis.Scanconn.Do("del", "list")conn.Do("lpush", "list", "aaabb", 100)vals, _ := redis.Values(conn.Do("lrange", "list", 0, -1))var name stringvar score intredis.Scan(vals, &score, &name)fmt.Println(name, score)}}

结构体读取

package mainimport ("fmt""reflect"_ "reflect""github.com/garyburd/redigo/redis"
)func main() {conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")conn.Close()})()///Sangfor-paas.237// 密码认证if _, authErr := conn.Do("AUTH", "paas.ss"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if true {var p1, p2 struct {Name string `redis:"name"`Age  string `redis:"age"`Sex  string `redis:"sex"`}p1.Age = "18"p1.Name = "chaochaoyu"p1.Sex = "male"// age 18 name mark sex maleargs1 := redis.Args{}.Add("role:10001").AddFlat(&p1)if _, err := conn.Do("hmset", args1...); err != nil {fmt.Println(err)return}m := map[string]string{"name": "quxiansen","age":  "20","sex":  "female",}args2 := redis.Args{}.Add("role:10002").AddFlat(m)if _, err := conn.Do("hmset", args2...); err != nil {fmt.Println(err)return}for _, id := range []string{"role:10001", "role:10002"} {v, err := redis.Values(conn.Do("HGETALL", id))if err != nil {fmt.Println(err)return}if err := redis.ScanStruct(v, &p2); err != nil {fmt.Println(err)return}fmt.Printf("%+v\n", p2)}}

reidsgo—管道以及事务

管道

redis pipeline 是一个客户端提供的,而不是服务端提供的;一次发送多条命令,减少与 redis-server 之间的网络交互;

在这里插入图片描述

type Conn interface {// Close closes the connection.Close() error// Err returns a non-nil value when the connection is not usable.Err() error// Do sends a command to the server and returns the received reply.Do(commandName string, args ...interface{}) (reply interface{}, errerror)// Do = Send + Flush + Receive// Send writes the command to the client's output buffer.Send(commandName string, args ...interface{}) error// Flush flushes the output buffer to the Redis server.Flush() error// Receive receives a single reply from the Redis serverReceive() (reply interface{}, err error)
}
代码使用
// 批量发送,批量接收
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Send(cmd3, ...)
c.Flush() // 将上面的三个命令发送出去
c.Receive() // cmd1 的返回值
c.Receive() // cmd2 的返回值
c.Receive() // cmd3 的返回值
// 如果不需要关注返回值
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Send(cmd3, ...)
c.Do("")
// 如果只关注最后一个命令的返回值
c.Send(cmd1, ...)
c.Send(cmd2, ...)
c.Do(cmd3, ...)
reids 网络事件处理

r在这里插入图片描述
edis 是单线程处理逻辑;网络事件处理以及命令处理都是在这个线程当中进行的; 每条连接都对应着一个读缓冲区,线程需要轮询每条连接,从连接的读缓冲区中分割出一个个有意义的数据包,每条连接的读缓冲区相当于一个队列;线程会交错执行活跃连接的命令

客户端批量发送测试
package mainimport ("fmt""github.com/garyburd/redigo/redis""math/rand""time"
)func main() {c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")c.Close()})()///paas.237// 密码认证if _, authErr := c.Do("AUTH", "paas.237"); authErr != nil {fmt.Println("Redis auth error", authErr)return}if false {c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc")c.Send("lpush", "list", 10001, 10002, 10003)c.Send("smembers", "set")c.Send("lrange", "list", 0, -1)c.Flush()c.Receive() // delc.Receive() // saddc.Receive() // lpushmbrs, err := redis.Strings(c.Receive()) // smembersif err != redis.ErrNil {fmt.Println(mbrs)}lsts, err := redis.Ints(c.Receive()) // lrangeif err != redis.ErrNil {fmt.Println(lsts)}}if false {c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc")c.Send("lpush", "list", 10001, 10002, 10003)// do里面有flush和所有receivec.Do("")}if true {rand.Seed(time.Now().UnixNano())c.Send("del", "set", "list", "zset")c.Send("sadd", "set", "aa", "bb", "cc"){args := redis.Args{}.Add("zset")args = args.Add(rand.Intn(100)).Add("xiaoming")args = args.Add(rand.Intn(100)).Add("xiaohong")args = args.Add(rand.Intn(100)).Add("xiaohuang")c.Send("zadd", args...)}{args := redis.Args{}.Add("zset")args = args.Add(0).Add(-1).Add("withscores")// 只关注最后一个的返回值vals, err := redis.Values(c.Do("zrange", args...))fmt.Printf("vals:%v\n", vals)if err != nil {panic(err)}//返回值是反过来的,name要放前面var rets []struct {Name  stringScore int}if err = redis.ScanSlice(vals, &rets); err != nil {panic(err)}fmt.Println(rets)}}
}

事务

虽然redis是单线程,对于一条链接请求队列是线性执行的,如果有多条链接,那么redis线程执行命令的顺序是根据各个队列中先后顺序来的。如果要实现事务,需要保证执行事务包含的命令中间时不插入别的会干扰需要操作数据的命令。

redis事务操作

MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

MULTI

开启事务

EXEC

提交事务

DISCARD

取消事务

WATCH

检测key的变动,若在事务请求时,key变动则取消事务;在事务开启前调用,乐观锁实现(cas); 若被取消则事务返回 nil ;

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC
reids ACID特性
  • 原子性:事务是一个不可分割的工作单位,事务中的操作要么全部成功,要么全部失败;redis 不支持回滚;即使事务队列中的某个命令在执行期间出现了错误,整个事务也会继续执行下去,直 到将事务队列中的所有命令都执行完毕为止。
  • 一致性:事务使数据库从一个一致性状态到另外一个一致性状态;这里的一致性是指预期的一致性(有命令出错后续命令也会执行)而不是异常后的一致性;所以redis也不满足;
  • 隔离性:事务的操作不被其他用户操作所打断;redis命令执行是串行的,redis事务天然具备隔 离性;
  • 持久性:redis只有在 aof 持久化策略的时候,并且需要在 redis.conf 中 appendfsync=always 才具备持久性;实际项目中几乎不会使用 aof 持久化策略;

lua实现原子性操作

lua 脚本实现原子性;

redis中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;redis lua 脚本的执行是原子性的;当 某个脚本正在执行的时候,不会有其他命令或者脚本被执行;

lua 脚本当中的命令会直接修改数据状态;

注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

# 从文件中读取 lua脚本内容
cat test1.lua | redis-cli script load --pipe
# 加载 lua脚本字符串 生成 sha1
> script load 'local key = KEYS[1];local s = redis.call("get",key);redis.call("set", key, s*2);return s*2'
"8f7d021dcc386a422e0febe38befdc6084357610"
# 检查脚本缓存中,是否有该 sha1 散列值的lua脚本
> script exists "8f7d021dcc386a422e0febe38befdc6084357610"
1) (integer) 1
# 清除所有脚本缓存
> script flush
OK
# 如果当前脚本运行时间过长,可以通过 script kill 杀死当前运行的脚本
> script kill
(error) NOTBUSY No scripts in execution right now.
执行lua脚本
EVAL
# 测试使用
EVAL script numkeys key [key ...] arg [arg ...]
EVALSHA
# 线上使用
EVALSHA sha1 numkeys key [key ...] arg [arg ...]

eg:

script load 'local key = KEYS[1];local s = redis.call("get",key);redis.call("set", key, s*2);return s*2'
"dbb7d4a2a615df353820f35ffa710a45fa1c4ec0"
set score 100
# 有一个参数
evalsha dbb7d4a2a615df353820f35ffa710a45fa1c4ec0 1 score
应用
# 1: 项目启动时,建立redis连接并验证后,先加载所有项目中使用的lua脚本(script load);
# 2: 项目中若需要热更新,通过redis-cli script flush;然后可以通过订阅发布功能通知所有服务器重新加载lua脚本;
# 3:若项目中lua脚本发生阻塞,可通过script kill暂停当前阻塞脚本的执行;

事务建议用lua脚本

go操作reids执行lua脚本

double.lua

local key = KEYS[1]
local default = ARGV[1]
if redis.call("exists", key) == 0 thenredis.call("set", key, default)
end
local val = redis.call("get", key)
redis.call("set", key, val*2)
return val*2

script.go

package mainimport ("fmt""github.com/garyburd/redigo/redis""io/ioutil"
)func main() {c, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "10.65.143.2", 31923))if err != nil {panic(err)}defer (func() {fmt.Println("connection close")c.Close()})()///ssr-paas.237// 密码认证if _, authErr := c.Do("AUTH", "sss-paas.237"); authErr != nil {fmt.Println("Redis auth error", authErr)return}var data []bytedata, err = ioutil.ReadFile("script/double.lua")if err != nil {fmt.Println("load double.lua error")return}// 设置一个参数script := redis.NewScript(1, string(data))script.Load(c)if true {c.Send("set", "score", 1000)rpy, _ := redis.Int(script.Do(c, "score"))fmt.Println(rpy)}if true {rpy, _ := redis.Int(script.Do(c, "bbb", 500))fmt.Println(rpy)}}

发布订阅

为了支持消息的多播机制,redis 引入了发布订阅模块;

生产者生产一次消息,由redis负责将消息复制到多个消息队列中,每个消息队列由相应的消费者进行消费;

它是分布式系统中常用的一种解耦方式,用于将多个消费者的逻辑进行拆分。多个消费者的逻辑就可以放到不同的子系统中完成;

在这里插入图片描述

# 订阅频道
subscribe 频道
psubscribe new.car
# 订阅模式频道
psubscribe 频道
psubscribe new.*
# 取消订阅频道
unsubscribe 频道
# 取消订阅模式频道
punsubscribe 频道
# 发布具体频道或模式频道的内容
publish 频道 内容
# 客户端收到具体频道内容
message 具体频道 内容
# 客户端收到模式频道内容
pmessage 模式频道 具体频道 内容
subscribe news.it news.showbiz news.car
psubscribe news.*
publish new.showbiz 'aaa bbb ccc'

go订阅频道实现

package mainimport ("fmt""github.com/garyburd/redigo/redis"
)func main() {sp, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))if err != nil {return}defer sp.Close()spc := redis.PubSubConn{Conn: sp}if false {// c.Do  =  c.Send + c.Flush + c.Receivespc.Subscribe("news.it")  // send + flushfor {switch v := spc.Receive().(type) {case redis.Message:fmt.Printf("%s: message: %s\n", v.Channel, v.Data)case redis.Subscription: // 是否注册成功的消息fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)case error:return}}}if false {// it showbz carsspc.PSubscribe("news.*")for {switch v := spc.Receive().(type) {case redis.PMessage:fmt.Printf("%s: pmessage: %s\n", v.Channel, v.Data)case redis.Subscription: // 是否注册成功的消息fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)case error:return}}}
}

注意

发布订阅功能一般要区别命令连接重新开启一个连接;因为命令连接严格遵循请求回应模式;而pubsub能收到redis主动推送的内容;所以实际项目中如果支持pubsub的话,需要另开一条连接用于处理发布订阅;

在这里插入图片描述

缺点

发布订阅的生产者传递过来一个消息,redis会直接找到相应的消费者并传递过去;假如没有消费者,消息直接丢弃;假如开始有2个消费者,一个消费者突然挂掉了,另外一个消费者依然能收到消息,但是如果刚挂掉的消费者重新连上后,在断开连接期间的消息对于该消费者来说彻底丢失了;

另外,redis停机重启,pubsub的消息是不会持久化的,所有的消息被直接丢弃;

stream

*在这里插入图片描述
多播可持久化队列。

  • 一个消息链表将加入的消息都串起来,每个消息都有一个唯一的消息ID和对应的内容;消息都是持久化的,redis 重启后,内容还在;

  • 每个 stream 对象通过一个 key 来唯一索引;每个 stream 都可以挂多个消费组(consumergroup),每个消费组会有个游标 last_delivered_id 在 stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。

  • stream 在第一次使用 xadd 命令后自动创建;而消费组不会自动创建,需要通过命令 xgroup create 进行创建,并且需要指定从 stream 的某个消息 ID 开始消费;

  • 每个消费组都是相互独立的,互相不受影响;也就是同一份 stream 内部的消息会被每个消费组都消费到;

  • 同一个消费组可以挂接多个消费者,这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标往前移动;

  • 消费者内部会有一个状态变量 pending_ids,它记录了当前已经被客户端读取,但是还没有 ack 的消息。当客户端 ack 一条消息后,pending_ids 将会删除该消息 ID;它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了而没有被处理;

命令
# 向 stream 追加消息, ID可以填* 让redis生成全局唯一
XADD key ID field string [field string ...]
# 从 stream 中删除消息
XDEL key ID [ID ...]
# 获取 stream 中消息列表,会自动过滤已经删除的消息
XRANGE key start end [COUNT count]
# 获取 stream 消息长度
XLEN key
# 删除 stream 中所有消息
DEL key
# 独立消费
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 创建消费者
XGROUP [CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
# 消费消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# > 意味着消费者希望只接收从未发送给任何其他消费者的消息。最新的消息
# 任意其他id 发送待处理的消息
# 确认消费消息
XACK key group ID [ID ...]
#创建stream_test stream 并由redis生成id
XADD stream_test * message "hello word"
"1626511386012-o"
#创建消费组g1 从0开始消费
XGROUP CREATE stream g1 0-0XLEN stream
(integer) 1
xrange stream - +#生成一个消费者,消费一个
xreadgroup group g1 consumer1 count 1 streams stream_test 0 # > 是获取最新消息, 0代表是任意ID# ACK该消息
xack stream_test g1 "1626511386012-o"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/176572.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue 使用js new Map()优化多个if else 执行方法

前言 在实际开发中根据业务需求我们经常要判断情况,一个if 我们科技直接使用ES6就可以解决 经常会出现根据不同的条件执行不同的方法,这是就会有多个if else 看起不太美观也费劲 js new map ()就可以解决这个问题,它…

Java封装一个根据指定的字段来获取子集的工具类

工具类 ZhLambdaUtils SuppressWarnings("all") public class ZhLambdaUtils {/*** METHOD_NAME*/private static final String METHOD_NAME "writeReplace";/*** 获取到lambda参数的方法名称** param <T> parameter* param function functi…

全志R128基础组件开发指南——图像采集

图像采集 CSI&#xff08;DVP&#xff09; 图像采集 SENSOR -> CSI 通路 CSI &#xff08;CMOS sensor interface&#xff09;接口时序上可支持独立 SYNC 和嵌入 SYNC(CCIR656)。支持接收 YUV422 或 YUV420 数据。 VSYNC 和HSYNC 的有效电平可以是正极性&#xff0c;也可…

流量分析(5.5信息安全铁人三项赛数据赛题解)

黑客通过外部的web服务器攻击到企业内部的系统中&#xff0c;并留下了web后门&#xff0c;通过外部服务器对内部进行了攻击。 目录 黑客攻击的第一个受害主机的网卡IP地址 黑客对URL的哪一个参数实施了SQL注入 第一个受害主机网站数据库的表前缀(加上下划线 例如abc_) 第一…

当路由器突然提示“未检测到入户网线”

找到浏览器的“安全DNS”&#xff0c;根据浏览器不同可能有差别。 开启安全DNS&#xff0c;使用“自定义” 地址用阿里的&#xff08;其它的也行&#xff09;&#xff1a; https://dns.alidns.com/dns-query{?dns}

【工程部署】在RK3588上部署OCR(文字检测识别)(DBNet+CRNN)

硬件平台&#xff1a; 1、firefly安装Ubuntu系统的RK3588&#xff1b; 2、安装Windows系统的电脑一台&#xff0c;其上安装Ubuntu18.04系统虚拟机。 参考手册&#xff1a;《00-Rockchip_RKNPU_User_Guide_RKNN_API_V1.3.0_CN》 《RKNN Toolkit Lite2 用户使用指南》 1、文…

AI创作系统ChatGPT网站源码+支持最新GPT-Turbo模型+支持DALL-E3文生图/AI绘画源码

一、AI创作系统 SparkAi创作系统是基于OpenAI很火的ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如…

【uni-app + uView】CountryCodePicker 国家区号组件

1. 效果图 2. 组件完整代码 <template><u-popup class="country-code-picker-container" v-if="show" :show

基于Qt 多线程(继承 QObject 的线程)

​ 继承 QThread 类是创建线程的一种方法,另一种就是继承QObject 类。继承 QObject 类更加灵活。它通过 QObject::moveToThread()方法,将一个 QObeject的类转移到一个线程里执行。恩,不理解的话,我们下面也画个图捋一下。 通过上面的图不难理解,首先我们写一个类继承 QObj…

虹科示波器 | 汽车免拆检修 | 2021款广汽丰田威兰达PHEV车发动机故障灯异常点亮

一、故障现象 一辆2021款广汽丰田威兰达PHEV车&#xff0c;搭载A25D-FXS发动机和动力蓄电池系统&#xff08;额定电压为355.2V&#xff0c;额定容量为45.0Ah&#xff09;&#xff0c;累计行驶里程约为1万km。车主反映&#xff0c;高速行驶时发动机突然抖动&#xff0c;且发动机…

原生JS实现视频截图

视频截图效果预览 利用Canvas进行截图 要用原生js实现视频截图&#xff0c;可以利用canvas的绘图功能 ctx.drawImage&#xff0c;只需要获取到视频标签&#xff0c;就可以通过drawImage把视频当前帧图像绘制在canvas画布上。 const video document.querySelector(video) con…

在Vue.js中,什么是slot(插槽)?它的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…