文章目录
- 分布式锁
- 基本概念及问题
- 超时问题
- 可重入锁
- Pub/Sub
- 消息多播
- Pub/Sub
- 模式订阅
- 消息结构
- `PubSub`缺陷
- 线程 `I/O`模型
- 非阻塞I/O
- 事件轮询(多路复用)
- `select`函数
- 指令队列
分布式锁
基本概念及问题
如果操作要修改用户状态,需要先
读取
再修改
用户状态,在内存修改,修改过之后再还回去,这个操作不是原子的
ps: 原子操作
是不会被线程调度打断的操作,这种操作一旦开始,就会运行到结束,不会有context switch
切换
分布式锁指令:
setnx lock:codehole true
... do something
del lock:codehole
出现的问题1:
如果在
setnx
和del
中间出现异常,就无法正常del
解决方案:
我们拿到锁之后给锁加上一个
expire
,超过时间自动释放
伪逻辑:
setnx lock:codehole true
expire lock:codehole 5
... do something
del lock:codehole
出现的问题2:
如果在
setnx
和expire
之间执行的过程中服务器出现异常,就会导致lock无法正常释放
解决方案:
究其原因还是
setnx
和expire
是两条指令而不是原子指令,之后的redis
的set
将setnx
和expire
可以一起执行
伪逻辑:
set lock:codehole true ex 5 nx OK
超时问题
出现的问题:
thread1
出现超时问题,这个时候thread2
就暂时拿到了这个锁,但是紧接着thread1
完成了逻辑,thread3
就会在thread2
执行完成前获取锁
解决方案:
Redis
尽量不要进行长时间任务
tag = random.nextint() # 随机数 if redis.set(key, tag, nx=True, ex=5):
do_something()
redis.delifequals(key, tag) # 假象的 delifequals 指令
2.使用Redis
的set
设置一个value
随机数,释放锁的时候需要匹配随机数是否一致,然后再del key
,因为value
和del key
不是原子操作,所以需要借助lua
来实现这个原子操作
# delifequals
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
可重入锁
可重入锁是指一个线程在持有锁的情况下可以再次加锁,如果一个锁可以支持一个线程多次加锁,那么这个锁就是可重入的
#- * -coding: utf - 8
import redis
import threading
locks = threading.local() locks.redis = {}
def key_for(user_id):return "account_{}".format(user_id)
def _lock(client, key):return bool(client.set(key, True, nx = True, ex = 5))
def _unlock(client, key): client.delete(key)
def lock(client, user_id): key = key_for(user_id) if key in locks.redis:locks.redis[key] += 1
return True
ok = _lock(client, key) if not ok:return False locks.redis[key] = 1
return True
def unlock(client, user_id): key = key_for(user_id) if key in locks.redis:locks.redis[key] -= 1
if locks.redis[key] <= 0: del locks.redis[key]
return True
return False
client = redis.StrictRedis()
print "lock", lock(client, "codehole")
print "lock", lock(client, "codehole")
print "unlock", unlock(client, "codehole")
print "unlock", unlock(client, "codehole")
Java版本
public class RedisWithReentrantLock
{private ThreadLocal < Map > lockers = new ThreadLocal < > ();private Jedis jedis;public RedisWithReentrantLock(Jedis jedis){this.jedis = jedis;}private boolean _lock(String key){return jedis.set(key, "", "nx", "ex", 5 L) != null;}private void _unlock(String key){jedis.del(key);}private Map < String, Integer > currentLockers(){Map < String, Integer > refs = lockers.get();if(refs != null){return refs;}lockers.set(new HashMap < > ());return lockers.get();}public boolean lock(String key){Map refs = currentLockers();Integer refCnt = refs.get(key);if(refCnt != null){refs.put(key, refCnt + 1);return true;}boolean ok = this._lock(key);if(!ok){return false;}refs.put(key, 1);return true;}public boolean unlock(String key){Map refs = currentLockers();Integer refCnt = refs.get(key);if(refCnt == null){return false;}refCnt -= 1;if(refCnt > 0){refs.put(key, refCnt);}else{refs.remove(key);this._unlock(key);}return true;}public static void main(String[] args){Jedis jedis = new Jedis();RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);System.out.println(redis.lock("codehole"));System.out.println(redis.lock("codehole"));System.out.println(redis.unlock("codehole"));System.out.println(redis.unlock("codehole"));}
}
Pub/Sub
消息多播
消息多播允许
producer
生产一次消息,中间件将消息复制到多个消息队列,但是Redis
消息队列不支持消息多播
Pub/Sub
为了支持消息多播,
Redis
单独使用了一个模块Pub/Sub
package mainimport ("fmt""github.com/go-redis/redis"
)const (RedisChannel = "my_channel"RedisTopic = "my_topic"
)// 定义一个 Redis 客户端实例
var redisClient *redis.Clientfunc init() {redisClient = redis.NewClient(&redis.Options{Addr: "localhost:6379",Password: "zyr5201314", // no password setDB: 0, // use default DB})
}// 生产者:向指定的频道发布消息
func publish(channel string, message string) {err := redisClient.Publish(channel, message).Err()if err != nil {fmt.Println("Failed to publish message:", err)}
}// 消费者:从指定的频道订阅消息
func subscribe(channel string) {pubsub := redisClient.Subscribe(channel)defer pubsub.Close()// 读取订阅的消息for msg := range pubsub.Channel() {fmt.Printf("Received message on channel [%s]: %s\n", channel, msg.Payload)}
}func main() {// 启动两个消费者,分别订阅 "my_channel" 和 "my_topic" 频道go subscribe(RedisChannel)go subscribe(RedisTopic)// 生产者向 "my_channel" 频道发布消息publish(RedisChannel, "Hello, Redis!")// 生产者向 "my_topic" 频道发布消息publish(RedisTopic, "Hello, Redis with topic!")// 阻塞主线程,等待消息处理完成select {}
}
需要注意的是
Client
在Sub
之后,Redis
会给予反馈成功消息,因为网络延迟,所以在sub
之后需要休眠一会才能拿到get\_message
的消息,Client
在Pub
之后也需要续面一会才能通过get\_message
拿到发布的消息,如果没有get\_message
会返回空
模式订阅
Consumer
可以订阅一个或多个主题
subscribe codehole.image codehole.text codehole.blog
如下:
subscribe
## 进行多条订阅
127.0.0.1:6379> subscribe codehole.image codehole.text codehole.blog
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "codehole.image"
3) (integer) 1
1) "subscribe"
2) "codehole.text"
3) (integer) 2
1) "subscribe"
2) "codehole.blog"
3) (integer) 3
publish
127.0.0.1:6379> publish codehole.image https://www.baidu.com
(integer) 1
127.0.0.1:6379> publish codehole.text "hello,welcome to wuhan"
(integer) 1
127.0.0.1:6379> publish codehole.blog '{"content":"hello,everyone","title":"welcome"}'
(integer) 1
subscribe
# 直接订阅codehole主题下的所有消息
127.0.0.1:6379> psubscribe codehole.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "codehole.*"
3) (integer) 1
1) "pmessage"
2) "codehole.*"
3) "codehole.blog"
4) "{\"content\":\"hello,everyone\",\"title\":\"welcome\"}"
消息结构
{'pattern': None, 'type': 'subscribe', 'channel': 'codehole', 'data': 1L}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'python comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'java comes'}
{'pattern': None, 'type': 'message', 'channel': 'codehole', 'data': 'golang comes'}
pattern
: 那种方式订阅的,subscribe
指令订阅的,这个字段就是空
type
:消息的类型,message,subscribe,psubscribe,unsubscribe
channel
:订阅的主题
data
:消息内容,一个字符串
PubSub
缺陷
Producer
产出消息,Redis
会找对应的Consumer
,如果有3个Consumer
这个时候一个Consumer
挂掉,Producer
会发送消息,另外两个Consumer
正常接收消息,但是挂掉的Consumer
脸上,这个Consumer
就丢失了
线程 I/O
模型
非阻塞I/O
套接字读写方法
read
传递进去一个参数n
,表示读入这么多字节的数据后返回,如果没读够thread
会卡在哪里,读够之后read
方法返回
write
一般不会阻塞,除了缓冲区满了,write
方法才会阻塞
**非阻塞I/O
**就是在读写的时候不阻塞,Read
取决于读缓冲区的数据字节数,Write
取决于写缓冲区的空闲字节数
事件轮询(多路复用)
问题:线程不知道什么时候继续读,什么时候继续写数据
解决方案:使用事件轮询api
解决
select
函数
select
是操作系统提供的API
,提供了read_fds
和write_fds
操作符,同时提供timeout
,刚开始线程处于阻塞状态,如果有:
- 有任何事件到来,就会返回
timeout
时间到达也会返回
read_events, write_events = select(read_fds, write_fds, timeout)
for event in read_events:handle_read(event.fd)
for event in write_events:handle_write(event.fd)
handle_others() # 处理其它事情,如定时任务等
我们通过select
同时处理多个操作符,这种叫做多路复用API
,现在用的epoll(linux)
和kqueue(freebsd&macosx)
,因为select
在描述符
多的时候性能会很差
服务器套接字
websocket
是调用accept
接受Client
新连接,也是通过select
系统调用新事件通知的
指令队列
Redis
每个Client
都会关联一个指令队列,指令会队列排队处理,FIFO