JavaWeb_LeadNews_Day5-文章定时发布

JavaWeb_LeadNews_Day5-文章定时发布

  • 延时服务
    • 概述
      • DelayQueue
      • RabbitMQ(常用)
      • Redis(常用)
    • redis延迟服务
      • 实现思路
        • 总思路
        • 添加任务
        • 取消任务
        • 拉取任务
        • 未来数据定时刷新
        • redis解决集群抢占
      • 具体实现
        • 乐观锁
        • docker运行redis
        • 项目集成redis
        • 添加任务
        • 取消任务
        • 拉取任务
        • 未来数据定时刷新
        • redis解决集群抢占
        • 数据库定时同步
        • 远程接口
        • 发布文章添加延迟任务
        • 拉取任务
      • 总结
        • redis延迟服务
        • 数据库
        • 未来数据定时刷新
          • redis key值匹配
  • 来源
  • Gitee

延时服务

概述

DelayQueue

  • JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口
  • 在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
  • 使用DelayQueue作为延迟任务,如果程序挂掉之后,任务都是放在内存,消息会丢失,如何保证数据不丢失

RabbitMQ(常用)

  • TTL: Time To Live(消息存活时间)
  • 死信队列: Dead Letter Exchange(死信交换机), 当消息成为Dead message后, 可以重新发送另一个交换机(死信交换机)

Redis(常用)

  • zset数据类型的去重有序(分数排序)特点进行延迟. 例如: 时间戳(毫秒值)作为score进行排序

redis延迟服务

实现思路

总思路

添加任务

  1. 添加任务到数据库
  2. 添加任务到redis
    1. 如果任务的执行时间小于等于当前时间存入list
    2. 如果任务的执行时间大于当前时间, 小于等于预设时间(未来5分钟)存入zset

取消任务

  • 场景: 第三方接口网络不通, 使用延迟任务进行重试, 达到阈值后, 取消任务
  1. 根据taskId删除任务, 修改任务日志状态为2(取消)
  2. 删除redis中对应的任务数据, 包括listzset

拉取任务

  1. 消费任务, 参数为: 任务的类型和优先级
  2. 从redis的list中pop数据, pop: 取出数据并删除
  3. 删除任务和更新任务日志

未来数据定时刷新

  1. 每分钟获取未来数据的keys
  2. 按照分值查询zset, 判断数据是否到期
  3. 到期数据同步到list中

redis解决集群抢占

  • 分布式锁: 控制分布式系统有序的去对共享资源进行操作, 通过互斥来保证数据的一致性
  • 解决方案
    方案说明
    数据库基于表的唯一索引
    zookeeper根据zookeep中的临时有序节点排序
    redis使用setnx命令完成
  • redis分布式锁: setnx(set if not exists)命令在指定的key不存在时, 为key设置指定的值

具体实现

乐观锁

// @Version注解
public class TaskinfoLogs implements Serializable {.../*** 版本号,用乐观锁*/@Versionprivate Integer version;...
}// Interceptor
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}

docker运行redis

// 拉取镜像
docker pull redis// 创建运行容器
docker run -d --name redis -p 6379:6379 redis --requirepass "leadnews"

项目集成redis

  • 依赖
    <!--spring data redis & cache-->
    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!-- redis依赖commons-pool 这个依赖一定要添加 -->
    <dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
    </dependency>
    
  • 配置
    spring:redis:host: 192.168.174.133password: leadnewsport: 6379
    
  • 工具类
    // 太长了, 从仓库获取// 配置spring.factories
    
  • 测试类
    @SpringBootTest(classes = ScheduleApplication.class)
    @RunWith(SpringRunner.class)
    public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList(){//在list的左边添加元素//cacheService.lLeftPush("list_001","hello,redis");//在list的右边获取元素,并删除String list_001 = cacheService.lRightPop("list_001");System.out.println(list_001);}@Testpublic void testZset(){//添加数据到zset中  分值cacheService.zAdd("zset_key_001","hello zset 001",1000);cacheService.zAdd("zset_key_001","hello zset 002",8888);cacheService.zAdd("zset_key_001","hello zset 003",7777);cacheService.zAdd("zset_key_001","hello zset 004",999999);//按照分值获取数据Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);}
    }
    

添加任务

// Service
@Service
@Transactional
public class TaskServiceImpl implements TaskService {@Overridepublic long addTask(Task task) {// 1. 添加任务到数据库中boolean success = addTaskToDb(task);// 2. 添加任务到redisif(success){addTaskToCache(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;private void addTaskToCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();// 获取5分钟后的时间, 毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();if(task.getExecuteTime() <= System.currentTimeMillis()){// 2.1 如果任务的执行时间小于等于当前时间, 存入listcacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}else if(task.getExecuteTime() <= nextScheduleTime){// 2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟), 存入zsetcacheService.zAdd(ScheduleConstants.FUTURE+key, JSON.toJSONString(task), task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;private boolean addTaskToDb(Task task) {boolean flag = false;try {// 保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);// 设置taskIdtask.setTaskId(taskinfo.getTaskId());// 保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (BeansException e) {e.printStackTrace();}return flag;}
}// Test
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class TaskServiceImplTest {@Autowiredprivate TaskService taskService;@Testpublic void addTask() {Task task = new Task();task.setTaskType(100);task.setExecuteTime(new Date().getTime()+500000);task.setParameters("Hello Task!".getBytes());task.setPriority(50);long taskId = taskService.addTask(task);System.out.println(taskId);}
}

取消任务

// Service
public boolean cancelTask(Long taskId) {boolean flag = false;// 删除任务, 修改任务日志状态为2(取消)Task task = updateDb(taskId, ScheduleConstants.CANCELLED);// 删除redis中的数据if(task != null) {removeTaskFromCache(task);flag = true;}return flag;
}/*** 删除redis中的数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key, 0, JSON.toJSONString(task));}else{cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务, 更新任务日志* @param taskId* @param cancelled* @return*/
private Task updateDb(Long taskId, int cancelled) {Task task = null;try {// 删除任务taskinfoMapper.deleteById(taskId);// 更新任务日志TaskinfoLogs taskLog = taskinfoLogsMapper.selectById(taskId);taskLog.setStatus(cancelled);taskinfoLogsMapper.updateById(taskLog);task = new Task();BeanUtils.copyProperties(taskLog, task);task.setExecuteTime(taskLog.getExecuteTime().getTime());} catch (Exception e) {log.error("task cancel exception taskId={}", taskId);}return task;
}

拉取任务

public Task poll(Integer type, Integer priority) {Task task = null;try {String key = type+"_"+priority;// 从redis中拉取数据 popString task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);// 修改数据库信息updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);}} catch (Exception e) {e.printStackTrace();log.error("poll task exception");}return task;
}

未来数据定时刷新

@Scheduled(cron = "0 */1 * * * ?")
public void refresh()
{log.info("未来数据定时刷新---定时任务");// 获取所有未来数据的集合keySet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");for (String futureKey : futureKeys) {// 获取当前数据的key topicString topiKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];// 按照key和分值查询符合条件的数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());// 同步数据if(!tasks.isEmpty()){cacheService.refreshWithPipeline(futureKey, topiKey, tasks);log.info("成功的将"+futureKey+"刷新到了"+topiKey);}}
}

redis解决集群抢占

public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}// use
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000*30);
if(StringUtils.isNotBlank(token)) {...
}

数据库定时同步

@PostConstruct // 初始化方法
@Scheduled(cron = "0 */5 * * * ?")
public void reloadData()
{// 清理缓存中的数据clearCache();// 查询符合条件的任务 小于未来5分钟的数据Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));// 把任务添加到redisif(taskinfoList != null && taskinfoList.size() > 0){for (Taskinfo taskinfo : taskinfoList) {Task task = new Task();BeanUtils.copyProperties(taskinfo, task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}}log.info("数据库的数据同步到了redis");
}/*** 清楚缓存中的数据*/
public void clearCache()
{Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");cacheService.delete(futureKeys);cacheService.delete(topicKeys);
}

远程接口

@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加任务*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(Task task){return ResponseResult.okResult(taskService.addTask(task));}/*** 取消任务*/@GetMapping("/api/v1/task/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") Long taskId){return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级拉取任务*/@GetMapping("/api/v1/task/{type}/{priority}")public ResponseResult poll(@PathVariable("type") Integer type, @PathVariable("priority") Integer priority){return ResponseResult.okResult(taskService.poll(type, priority));}
}

发布文章添加延迟任务

  • 枚举类
    @Getter
    @AllArgsConstructor
    public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
    }
    
  • ProtoStuff依赖
    <dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
    </dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
    </dependency>
    
  • 添加任务
    @Override
    @Async
    public void addNewsToTask(Integer id, Date publishTime) {Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);
    }
    

拉取任务

@Scheduled(fixedDelay = 1000)
@Override
public void scanNewsByTask() {log.info("消费任务, 审核文章");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) && responseResult.getData() != null){Task task = JSON.parseObject(JSON.toJSONString(responseResult.getData()), Task.class);WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}
}

总结

redis延迟服务

  • 为什么任务需要存储在数据库中?
    • 延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务, 内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑
  • 为什么使用redis中的两种数据类型, list和zset?
    • list存储立即执行的任务, zset存储未来的数据
    • 任务量过大以后, zset的性能会下降, 将立即执行的任务存储在list中, 可以提高性能( list的lpush是O(1), zset的zadd是O(log(n)) )
  • 添加zset数据时, 为什么需要预加载?
    • 如果任务数据特别大, 为了防止阻塞, 只需要把未来几分钟要执行的数据放入缓存即可, 是一种优化的形式.

数据库

  • 数据库准备
    • mysql中, blob是一个二进制大型对象, 是一个可以存储大量数据的容器, longblob最大存储4G
  • 数据库锁
    • 悲观锁(Pessimistic Lock): 拿数据的时候认为别人会修改, 所以每次拿取数据都会上锁
    • 乐观锁(Optimistic Lock): 拿数据的时候认为别人不会修改, 所以不会上锁, 但是在更新的时候会判断一下在此期间别人有没有去更新数据, 可以使用版本号等机制

未来数据定时刷新

redis key值匹配
  • keys模糊匹配

    keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用! 开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞.

  • scan
    SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数,以此来延续之前的迭代过程。

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/56576.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Shell】基础语法(二)

文章目录 一、Shell基本语法文件名代换命令代换算术代换转义字符引号 二、Shell脚本语法条件测试分支结构循环 三、总结 一、Shell基本语法 文件名代换 用于匹配的字符称为通配符&#xff08;Wildcard&#xff09;&#xff0c;如&#xff1a;* ? [ ] 具体如下&#xff1a; *…

【SpringBoot】86、SpringBoot中集成Quartz根据Cron表达式获取接下来5次执行时间

本篇文章根据集成 Quartz 根据 Cron 表达式获取接下来的 5 次执行时间,在配置定时任务时,可以清晰地知道自己的 Cron 表达式是否正确,对于 Quartz 不熟悉的同学可以先看看我之前的文章 【SpringBoot】82、SpringBoot集成Quartz实现动态管理定时任务 【SpringBoot】83、Spri…

[CKA]考试之查看pod的cpu

由于最新的CKA考试改版&#xff0c;不允许存储书签&#xff0c;本博客致力怎么一步步从官网把答案找到&#xff0c;如何修改把题做对&#xff0c;下面开始我们的 CKA之旅 题目为&#xff1a; Task 找出标签是namecpu-loader的Pod&#xff0c;并过滤出使用CPU最高的Pod&#…

20230802-下载并安装android-studio

下载 android-studio 安装包 https://developer.android.google.cn/studio/ 安装android-studio 双击安装包 D:\Android Studio

无涯教程-Perl - foreach 语句函数

foreach 循环遍历列表值&#xff0c;并将控制变量(var)依次设置为列表的每个元素- foreach - 语法 Perl编程语言中的 foreach 循环的语法是- foreach var (list) { ... } foreach - 流程图 foreach - 示例 #!/usr/local/bin/perllist(2, 20, 30, 40, 50);# foreach loop ex…

HTTP和HTTPS

目录 HTTP和HTTPS的基本概念(应用层协议&#xff09; HTTP的版本 HTTP与HTTPS有什么区别&#xff1f; HTTP的工作原理 HTTPS的工作原理 HTTPS的优点 HTTPS的缺点&#xff1a; HTTPS的优缺点&#xff08;总结&#xff09; 对称加密 非对称加密 证书 HTTPS的加密 HTT…

Vue3 第二节 Vue3的响应式

1.Vue3的响应式原理 2.ref函数和reactive函数的对比 3.setup注意点 一.Vue3的响应式原理 1.Vue2.x中的响应式原理 ① 实现原理 对象类型&#xff1a;通过Object.defineProperty() 对属性的读取&#xff0c;修改进行拦截&#xff08;数据劫持&#xff09;数组类型&#xf…

通过cpolar内网穿透发布网页测试

通过内网穿透发布网页测试 文章目录 通过内网穿透发布网页测试 对于网站开发者来说&#xff0c;对完成的网页进行测试十分必要&#xff0c;同时还要在测试过程中充分采纳委托制作方的意见&#xff0c;及时根据甲方意见进行修改&#xff0c;但在传统的测试方式中&#xff0c;必须…

虹科分享 | 新时代“救命神器”:看AR眼镜如何应用于紧急救险场景

从工业时代到如今迎来的“体验时代”&#xff0c;体验即内容&#xff0c;5G、AI、空间计算技术的突破&#xff0c;为各行各业创建了丰富的内容体验模式&#xff0c;让人们能够听之、触之、与之交互。AR是体验时代最具潜力的新技术&#xff0c;在“应急”场景中更是成为了我们在…

Unity 编辑器资源导入处理函数 OnPreprocessAudio :深入解析与实用案例

Unity 编辑器资源导入处理函数 OnPreprocessAudio 用法 点击封面跳转下载页面 简介 在 Unity 中&#xff0c;资源导入是一个非常重要的环节&#xff0c;它决定了资源在项目中的使用方式和效果。Unity 提供了一系列的资源导入处理函数&#xff0c;其中之一就是 OnPreprocessAud…

【vue】vue基础知识

1、插值表达式&属性绑定 <!--template展示给用户&#xff0c;相当于MVVM模式中的V--> <template><div class"first_div">//插值表达式<p>{{ message }}</p>//这里的参数是从父组件的template里传过来的<p>{{data_1}}</p…

大数据-玩转数据-Flink-Transform(上)

一、Transform 转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑. 二、基本转换算子 2.1、map&#xff08;映射&#xff09; 将数据流中的数据进行转换, 形成新的数据流&#xff0c;消费一个元素并产出一个元素…