浅谈Redis分布式锁(中)

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

我们在不久前介绍了SpringBoot定时任务,最近又一起探究了如何使用Redis实现简单的消息队列,都是一些不错的小知识点。为了能跟前面的内容产生联动,这次我们打算把Redis分布式锁相关的介绍融合进定时任务的案例中,学起来更带劲~

Demo构思

在我看来,同样需要使用锁,动机可能完全相反:

  • 在保证线程安全的前提下,尽量让所有线程都执行成功
  • 在保证线程安全的前提下,只让一个线程执行成功

前者适用于秒杀等场景。作为商家,当然希望在不发生线程安全问题的前提下,让每一个订单都生效,直到商品售罄。此时分布式锁的写法可以是“不断重试”“阻塞等待”,即:递归或while true循环尝试获取、阻塞等待。

而后者适用于分布式系统或多节点项目的定时任务,比如同一份代码部署在A、B两台服务器上,而数据库共用同一个。如果不做限制,那么在同一时刻,两台服务器都会去拉取列表执行,会发生任务重复执行的情况。

此时可以考虑使用分布式锁,在cron触发的时刻只允许一个线程去往数据库拉取任务:

在实现Redis分布式锁控制定时任务唯一性的同时,我们引入之前的Redis消息队列。注意,这与Redis分布式锁本身无关,就是顺便复习一遍Redis消息队列而已,大家可以只实现Redis分布式锁+定时任务的部分。

整个Demo的结构大致如图:

当然,实际项目中一般是这样的:

分布式锁为什么难设计?

首先,要和大家说一下,但凡牵涉到分布式的处理,没有一个是简单的,上面的Demo设计也不过是玩具,用来启发 大家的思路。

为什么要把Demo设计得这么复杂呢?哈哈,因为这是我在上一家公司自己设计的,遇到了很多坑...拿出来自嘲一番,与各位共勉。

我当时的设计思路是:

由于小公司没有用什么Elastic-Job啥的,就是很普通的多节点部署。为了避免任务重复执行,我想设计一个分布式锁。但因为当时根本不知道Redisson,所以就自己百度了Redis实现分布式锁的方式,然后依葫芦画瓢自己手写了一个 。

但我写完Redis分布式锁后,在实际测试过程中发现还需要考虑锁的失效时间...

这里有两个问题:

  • 为什么要设置锁的过期时间?
  • 锁的过期时间设置多久合适?

最简单的实现方案是这样的,一般没问题:

但极端的情况下(项目在任务进行时重启或意外宕机),可能当前任务来不及解锁就挂了(死锁),那么下一个任务就会一直被锁在方法外等待。就好比厕所里有人被熏晕了,没法开门,而外面的人又进不去...

此时需要装一个自动解锁的门,到时间自动开门,也就是要给锁设置一个过期时间。但紧接着又会有第二个问题:锁的失效时间设多长合适?

很难定。

因为随着项目的发展,定时任务的执行时间很可能是变化的。

如果设置时间过长,极端点,定为365天。假设任务正常执行,比如10分钟就结束,那么线程继续往下就会执行unLock()主动解锁。但万一和上面一样宕机了,那么这个锁就要等365天后才解开。注意,宕机可不像JVM异常,它压根不会去执行finally里的unLock()。这种情况好比有个人在厕所里上大号直接掉坑里了,而自动门默认365天打开…所以,锁过期时间设置过长的坏处,本质是一旦发生宕机来不及解锁,那么过期时间越长,影响面越广,会导致其他操作阻滞。

如果设置时间过短,上一个人还没拉完,门就“咔嚓”一声开了,尴尬不,重复执行了。

终上所述,我当时之所以设计得这么复杂,就是想尽量缩短任务执行的时间,让它尽可能短(拉取后直接丢给队列,自己不处理),这样锁的时间一般设置30分钟就没啥问题。另外,对于死锁问题,我当时没有考虑宕机的情况,只考虑了意外重启…问题还有很多,文末会再总结。

请大家阅读下面代码时思考两个问题:

  • Demo如何处理锁的过期时间
  • Demo如何防止死锁

项目搭建

新建一个空的SpringBoot项目。

拷贝下方代码,构建工程:

构建完以后,拷贝一份,修改端口号为8081,避免和原先的冲突

统一管理Redis Key:RedisKeyConst

/*** 统一管理Redis Key** @author mx*/
public final class RedisKeyConst {/*** 分布式锁的KEY*/public static final String RESUME_PULL_TASK_LOCK = "resume_pull_task_lock";/*** 简历异步解析任务队列*/public static final String RESUME_PARSE_TASK_QUEUE = "resume_parse_task_queue";
}

Redis消息队列:RedisMessageQueueConsumer

/*** 消费者,异步获取简历解析结果并存入数据库** @author mx*/
@Slf4j
@Component
public class RedisMessageQueueConsumer implements ApplicationListener<ContextRefreshedEvent> {@Autowiredprivate RedisService redisService;@Autowiredprivate AsyncResumeParser asyncResumeParser;@Autowiredprivate ObjectMapper objectMapper;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {log.info("开始监听RedisMessageQueue...");CompletableFuture.runAsync(() -> {// 大循环,不断监听队列任务(阻塞式)while (true) {// 阻塞监听ResumeCollectionDTO resumeCollectionDTO = (ResumeCollectionDTO) redisService.popQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, 5, TimeUnit.SECONDS);if (resumeCollectionDTO != null) {int rePullCount = 0;int retryCount = 0;log.info("从队列中取出:{}", resumeCollectionDTO.getName());log.info(">>>>>>>>>>>>>>>>>>>开始拉取简历:{}", resumeCollectionDTO.getName());Long asyncPredictId = resumeCollectionDTO.getAsyncPredictId();// 小循环,针对每一个任务多次调用第三方接口,直到获取最终结果或丢弃任务while (true) {try {PredictResult result = asyncResumeParser.getResult(asyncPredictId);rePullCount++;// 如果已经解析完毕if (result.getStatus() == 2) {// 保存数据库try {log.info("简历:{}解析成功", resumeCollectionDTO.getName());log.info("resultJson:{}", result.getResultJson());ResumeCollectionDO resumeCollectionDO = objectMapper.readValue(result.getResultJson(), ResumeCollectionDO.class);log.info("<<<<<<<<<<<<<<<<<<<保存简历:{}到数据库", resumeCollectionDO);// 归零rePullCount = 0;retryCount = 0;break;} catch (Exception e) {discardTask(resumeCollectionDTO);log.info("<<<<<<<<<<<<<<<<<<<保存简历失败,丢弃任务");rePullCount = 0;retryCount = 0;break;}}// 远程服务还未解析完毕,重试else {try {if (rePullCount <= 3) {// 前3次重试,时间为1s间隔TimeUnit.SECONDS.sleep(1);log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿1s后进行", resumeCollectionDTO.getName(), rePullCount);} else if (rePullCount > 3 && rePullCount <= 6) {// 说明任务比较耗时,加长等待时间TimeUnit.SECONDS.sleep(2);log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿2s后进行", resumeCollectionDTO.getName(), rePullCount);} else if (rePullCount > 6 && rePullCount <= 8) {// 说明任务比较耗时,加长等待时间TimeUnit.SECONDS.sleep(3);log.info("简历:{}尚未解析完毕, 准备进行第{}次重试, 停顿3s后进行", resumeCollectionDTO.getName(), rePullCount);} else {discardTask(resumeCollectionDTO);log.info("<<<<<<<<<<<<<<<<<<<多次拉取仍未得到结果, 丢弃简历:{}", resumeCollectionDTO.getName());retryCount = 0;rePullCount = 0;break;}} catch (InterruptedException e) {discardTask(resumeCollectionDTO);log.info("<<<<<<<<<<<<<<<<<<<任务中断异常, 简历:{}", resumeCollectionDTO.getName());rePullCount = 0;retryCount = 0;break;}}} catch (Exception e) {if (retryCount > 3) {discardTask(resumeCollectionDTO);log.info("<<<<<<<<<<<<<<<<<<<简历:{}重试{}次后放弃, rePullCount:{}, retryCount:{}", resumeCollectionDTO.getName(), retryCount, rePullCount, retryCount);rePullCount = 0;retryCount = 0;break;}retryCount++;log.info("简历:{}远程调用异常, 准备进行第{}次重试...", resumeCollectionDTO.getName(), retryCount);}}log.info("break......");}}});}private void discardTask(ResumeCollectionDTO task) {// 根据asyncPredictId删除任务...log.info("丢弃任务:{}...", task.getName());}}

实体类:DO+DTO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResumeCollectionDO {/*** 简历id*/private Long id;/*** 简历名称*/private String name;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResumeCollectionDTO implements Serializable {/*** 简历id*/private Long id;/*** 异步解析id,稍后根据id可获取最终解析结果*/private Long asyncPredictId;/*** 简历名称*/private String name;
}

分布式锁:RedisService

public interface RedisService {/*** 向队列插入消息** @param queue 自定义队列名称* @param obj   要存入的消息*/void pushQueue(String queue, Object obj);/*** 从队列取出消息** @param queue    自定义队列名称* @param timeout  最长阻塞等待时间* @param timeUnit 时间单位* @return*/Object popQueue(String queue, long timeout, TimeUnit timeUnit);/*** 尝试上锁** @param lockKey* @param value* @param expireTime* @param timeUnit* @return*/boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit);/*** 根据MACHINE_ID解锁(只能解自己的)** @param lockKey* @param value* @return*/boolean unLock(String lockKey, String value);/*** 释放锁,不管是不是自己的** @param lockKey* @param value* @return*/boolean releaseLock(String lockKey, String value);}
@Slf4j
@Component
public class RedisServiceImpl implements RedisService {@Autowiredprivate RedisTemplate redisTemplate;/*** 向队列插入消息** @param queue 自定义队列名称* @param obj   要存入的消息*/@Overridepublic void pushQueue(String queue, Object obj) {redisTemplate.opsForList().leftPush(queue, obj);}/*** 从队列取出消息** @param queue    自定义队列名称* @param timeout  最长阻塞等待时间* @param timeUnit 时间单位* @return*/@Overridepublic Object popQueue(String queue, long timeout, TimeUnit timeUnit) {return redisTemplate.opsForList().rightPop(queue, timeout, timeUnit);}/*** 尝试上锁** @param lockKey* @param value* @param expireTime* @param timeUnit* @return*/@Overridepublic boolean tryLock(String lockKey, String value, long expireTime, TimeUnit timeUnit) {Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, value);if (Boolean.TRUE.equals(lock)) {redisTemplate.expire(lockKey, expireTime, timeUnit);return true;} else {return false;}}/*** 根据MACHINE_ID解锁(只能解自己的)** @param lockKey* @param value* @return*/@Overridepublic boolean unLock(String lockKey, String value) {String machineId = (String) redisTemplate.opsForValue().get(lockKey);if (StringUtils.isNotEmpty(machineId) && machineId.equals(value)) {redisTemplate.delete(lockKey);return true;}return false;}/*** 释放锁,不管是不是自己的** @param lockKey* @param value* @return*/@Overridepublic boolean releaseLock(String lockKey, String value) {Boolean delete = redisTemplate.delete(lockKey);if (Boolean.TRUE.equals(delete)) {log.info("Spring启动,节点:{}成功释放上次简历汇聚定时任务锁", value);return true;}return false;}}

定时任务:ResumeCollectionTask

@Slf4j
@Component
@EnableScheduling
public class ResumeCollectionTask implements ApplicationListener<ContextRefreshedEvent> {/*** 当这份代码被部署到不同的服务器,启动时为每台机器分配一个唯一的机器ID*/private static final String MACHINE_ID = IdUtil.randomUUID();@Autowiredprivate RedisService redisService;@Autowiredprivate AsyncResumeParser asyncResumeParser;@Scheduled(cron = "0 */1 * * * ?")
//    @Scheduled(fixedDelay = 60 * 1000L)public void resumeSchedule() {// 尝试上锁,返回true或false,锁的过期时间设置为10分钟(实际要根据项目调整,这也是自己实现Redis分布式锁的难点之一)boolean lock = redisService.tryLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID, 10, TimeUnit.MINUTES);// 如果当前节点成功获取锁,那么整个系统只允许当前程序去MySQL拉取待执行任务if (lock) {log.info("节点:{}获取锁成功,定时任务启动", MACHINE_ID);try {collectResume();} catch (Exception e) {log.info("定时任务异常:", e);} finally {redisService.unLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);log.info("节点:{}释放锁,定时任务结束", MACHINE_ID);}} else {log.info("节点:{}获取锁失败,放弃定时任务", MACHINE_ID);}}/*** 任务主体:* 1.从数据库拉取符合条件的HR邮箱* 2.从HR邮箱拉取附件简历* 3.调用远程服务异步解析简历* 4.插入待处理任务到数据库,作为记录留存* 5.把待处理任务的id丢到Redis Message Queue,让Consumer去异步处理*/private void collectResume() throws InterruptedException {// 跳过1、2两步,假设已经拉取到简历log.info("节点:{}从数据库拉取任务简历", MACHINE_ID);List<ResumeCollectionDO> resumeCollectionList = new ArrayList<>();resumeCollectionList.add(new ResumeCollectionDO(1L, "张三的简历.pdf"));resumeCollectionList.add(new ResumeCollectionDO(2L, "李四的简历.html"));resumeCollectionList.add(new ResumeCollectionDO(3L, "王五的简历.doc"));// 模拟数据库查询耗时TimeUnit.SECONDS.sleep(3);log.info("提交任务到消息队列:{}", resumeCollectionList.stream().map(ResumeCollectionDO::getName).collect(Collectors.joining(",")));for (ResumeCollectionDO resumeCollectionDO : resumeCollectionList) {// 上传简历异步解析,得到异步结果idLong asyncPredictId = asyncResumeParser.asyncParse(resumeCollectionDO);// 把任务插入数据库// 略...// 把任务丢到Redis Message QueueResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();BeanUtils.copyProperties(resumeCollectionDO, resumeCollectionDTO);resumeCollectionDTO.setAsyncPredictId(asyncPredictId);redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);}}/*** 项目重启后先尝试删除之前的锁(如果存在),防止死锁等待** @param event the event to respond to*/@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {redisService.releaseLock(RedisKeyConst.RESUME_PULL_TASK_LOCK, MACHINE_ID);}}

模拟第三方服务(异步)

/*** 第三方提供给的简历解析服务** @author mx*/
@Service
public class AsyncResumeParser {@Autowiredprivate ObjectMapper objectMapper;/*** 模拟分配异步任务结果id,不用深究,没啥意义,反正每个任务都会得到一个id,稍后根据id返回最终解析结果*/private static final AtomicLong ASYNC_RESULT_ID = new AtomicLong(1000);/*** 解析结果*/private static final Map<Long, String> results = new HashMap<>();/*** 模拟第三方服务异步解析,返回解析结果** @param resumeCollectionDO* @return*/public Long asyncParse(ResumeCollectionDO resumeCollectionDO) {long asyncPredictId = ASYNC_RESULT_ID.getAndIncrement();try {String resultJson = objectMapper.writeValueAsString(resumeCollectionDO);results.put(asyncPredictId, resultJson);return asyncPredictId;} catch (JsonProcessingException e) {e.printStackTrace();}return -1L;}/*** 根据异步id返回解析结果,但此时未必已经解析成功* <p>* 解析状态* 0 初始化* 1 处理中* 2 调用成功* 3 调用失败** @param asyncPredictId* @return*/public PredictResult getResult(Long asyncPredictId) throws ParseErrorException, InterruptedException {// 随机模拟异步解析的状态int value = ThreadLocalRandom.current().nextInt(100);if (value >= 85) {// 模拟解析完成TimeUnit.SECONDS.sleep(1);String resultJson = results.get(asyncPredictId);return new PredictResult(resultJson, 2);} else if (value <= 5) {// 模拟解析异常TimeUnit.SECONDS.sleep(1);throw new ParseErrorException("简历解析异常");}// 如果时间过短,返回status=1,表示解析中TimeUnit.SECONDS.sleep(1);return new PredictResult("", 1);}}
/*** 解析异常** @author mx*/
public class ParseErrorException extends Exception {/*** Constructs a new exception with {@code null} as its detail message.* The cause is not initialized, and may subsequently be initialized by a* call to {@link #initCause}.*/public ParseErrorException() {}/*** Constructs a new exception with the specified detail message.  The* cause is not initialized, and may subsequently be initialized by* a call to {@link #initCause}.** @param message the detail message. The detail message is saved for*                later retrieval by the {@link #getMessage()} method.*/public ParseErrorException(String message) {super(message);}
}
/*** 第三方返回值** @author mx*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PredictResult {/*** 解析结果*/private String resultJson;/*** 解析状态* 0 初始化* 1 处理中* 2 调用成功* 3 调用失败*/private Integer status;
}

模拟异常

在项目运行过程中,启动这个测试类的方法,即可观察不一样的现象。

@SpringBootTest
class RedisDistributedLockApplicationTests {@Autowiredprivate RedisService redisService;/*** 作为失败案例(因为不存在777L这个解析任务,AsyncResumeParse.results会返回null)* 观察RedisMessageQueueConsumer的处理方式*/@Testvoid contextLoads() {ResumeCollectionDTO resumeCollectionDTO = new ResumeCollectionDTO();resumeCollectionDTO.setId(666L);resumeCollectionDTO.setAsyncPredictId(777L);resumeCollectionDTO.setName("测试1号");redisService.pushQueue(RedisKeyConst.RESUME_PARSE_TASK_QUEUE, resumeCollectionDTO);}}

pom.xml

server:port: 8080spring:redis:host: password:  database: 2

效果展示

啥都不说了,都在jiu代码里了。大家自己拷贝到本地,动手玩一下,加深对Redis锁和Redis消息队列的理解。

 

只有一个定时任务能去数据库拉取任务,到时多节点部署大致是下面这样(redis一般是独立部署的,和节点代码无关):

后话

上面展示的代码其实存在很多问题,我们会在下一篇指出并讨论解决方案。

本文仅提供思路,开阔大家的眼界,千万别在自己项目中使用!!!!我当年被这个坑惨了,花里胡哨的,尤其Consumer里一大堆的sleep(),是非常low的!!

对于异步调用的结果,不要循环等待,而应该分为几步:

  1. 调用异步接口,得到异步结果唯一id
  2. 将结果id保存到任务表中,作为一个任务
  3. 启动定时任务,根据id拉取最终结果(如果还没有结果,跳过当前任务,等下一个定时任务处理)

分布式定时任务可以考虑xxl-job或elastic-job,分布式锁推荐使用redisson。

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/296425.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于电商场景的高并发RocketMQ实战-NameServer内核原理剖析、Broker 主从架构与集群模式原理分析

&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308; 【11来了】文章导读地址&#xff1a;点击查看文章导读&#xff01; &#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f341;&#x1f3…

线性回归简介

线性回归简介 1、情景描述2、线性回归 1、情景描述 假设&#xff0c;我们现在有这么一张图&#xff1a; 其中&#xff0c;横坐标x表示房子的面积&#xff0c;纵坐标y表示房价。我们猜想x与y之间存在线性关系&#xff1a; y k x b ykxb ykxb 现在&#xff0c;思考一个问题&…

Redis数据一致解决方案

文章目录 前言技术积累查询缓存业务流程更新缓存业务流程 更新缓存问题解决方案写在最后 前言 当前的应用服务很多都有着高并发的业务场景&#xff0c;对于高并发的解决方案一般会用到缓存来降低数据库压力&#xff0c;并且还能够提高系统性能减少请求耗时&#xff0c;比如我们…

Pytorch项目,肺癌检测项目之四

# 安装图像处理 的两个包 simpleITK 和 ipyvolume # 安装缓存相关的两个包 diskcache 和 cassandra-driver import gzip from diskcache import FanoutCache, Disk from cassandra.cqltypes import BytesType from diskcache import FanoutCache,Disk,core from diskcache…

c语言的练习---BCD解密

#继续源于c语言翁恺先生 一.分析 初看这道题的时候&#xff0c;可能很多人就想选择放弃&#xff0c;但这道题实在不是考察我们对于编码的能力&#xff1b;而是我们的数学能力。 就拿它的输入样例---18&#xff0c;来举例。 我们来看---在十进制中&#xff0c;是18D&#xf…

论文笔记--Learning Political Polarization on Social Media Using Neural Networks

论文笔记--Learning Political Polarization on Social Media Using Neural Networks 1. 文章简介2. 文章概括3. 相关工作4. 文章重点技术4.1 Collection of posts4.1.1 数据下载4.1.2 数据预处理4.1.3 统计显著性分析 4.2 Classification of Posts4.3 Polarization of users 5…

C++ vector的模拟实现

一 vector的大致框架 1.1 框架 vector的成员变量不再是我们熟悉的size&#xff0c;capacity&#xff0c;而是变成了功能一致的三个指针&#xff1a;_start,_finish,_endofstorage&#xff0c;三个指针的作用如下&#xff1a; 同时&#xff0c;因为其本身指针的特性&#xff0c…

工具:meson+ninja(安装问题解决)

问题1&#xff1a;Python版本问题 报错信息&#xff1a; NOTICE: You are using Python 3.6 which is EOL. Starting with v0.62.0, Meson will require Python 3.7 or newer ubuntu 18默认的python3是3.6. 解决方案1&#xff1a;从源码安装python 3.7 wget https://www.pyth…

阿里云江苏省中小企业补贴5000元上云补贴金

阿里云「数智惠企」中小企业补贴&#xff0c;江苏区域企业提交申请内部评估及审批通过后&#xff0c;即可获取上云补贴金&#xff0c;使用补贴金购买指定云产品&#xff0c;满10000元即可立减5000元&#xff0c;请抓紧申领。阿里云百科 aliyunbaike.com 分享江苏区域5000元上云…

Postman创建及删除workspace工作空间

文章目录 一、Postman创建workspace工作空间二、Postman删除workspace工作空间 一、Postman创建workspace工作空间 打开Postman 点击 Workspaces → Create Workspaces 如图所示操作 工作空间创建完成 二、Postman删除workspace工作空间 点击 Workspaces → 选择要删除…

【pynput】鼠标行为追踪并模拟

文章目录 前言基本思路安装依赖包实时鼠标捕获捕获鼠标位置捕获鼠标事件记录点击内容 效果图 利用本文内容从事的任何犯法行为和开发与本人无关&#xff0c;请理性利用技术服务大家&#xff0c;创建美好和谐的社会&#xff0c;让人们生活从繁琐中变得更加具有创造性&#xff01…

欠采样对二维相位展开的影响

1.前言 如前所述&#xff0c;相位展开器通过计算两个连续样本之间的差来检测图像中包裹的存在。如果这个差值大于π或小于-π&#xff0c;则相位展开器认为在这个位置存在包裹。这可能是真正的相位包络&#xff0c;也可能是由噪声或采样不足引起的伪包络。 对欠采样的相位图像…