JavaWeb_LeadNews_Day5-文章定时发布
- 延时服务
- 概述
- DelayQueue
- RabbitMQ(常用)
- Redis(常用)
- redis延迟服务
- 实现思路
- 总思路
- 添加任务
- 取消任务
- 拉取任务
- 未来数据定时刷新
- redis解决集群抢占
- 具体实现
- 乐观锁
- docker运行redis
- 项目集成redis
- 添加任务
- 取消任务
- 拉取任务
- 未来数据定时刷新
- redis解决集群抢占
- 数据库定时同步
- 远程接口
- 发布文章添加延迟任务
- 拉取任务
- 总结
- redis延迟服务
- 数据库
- 未来数据定时刷新
- redis key值匹配
- 来源
- Gitee
延时服务
概述
DelayQueue
- JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列,内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口
- 在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
- 使用DelayQueue作为延迟任务,如果程序挂掉之后,任务都是放在内存,消息会丢失,如何保证数据不丢失
RabbitMQ(常用)
- TTL: Time To Live(消息存活时间)
- 死信队列: Dead Letter Exchange(死信交换机), 当消息成为Dead message后, 可以重新发送另一个交换机(死信交换机)
Redis(常用)
- zset数据类型的去重有序(分数排序)特点进行延迟. 例如: 时间戳(毫秒值)作为score进行排序
redis延迟服务
实现思路
总思路
添加任务
- 添加任务到
数据库
中 - 添加任务到
redis
中- 如果任务的执行时间小于等于当前时间存入
list
- 如果任务的执行时间大于当前时间, 小于等于预设时间(未来5分钟)存入
zset
中
- 如果任务的执行时间小于等于当前时间存入
取消任务
- 场景: 第三方接口网络不通, 使用延迟任务进行重试, 达到阈值后, 取消任务
- 根据
taskId
删除任务, 修改任务日志状态为2(取消)
- 删除
redis
中对应的任务数据, 包括list
和zset
拉取任务
- 消费任务, 参数为: 任务的类型和优先级
- 从redis的list中pop数据, pop: 取出数据并删除
- 删除任务和更新任务日志
未来数据定时刷新
- 每分钟获取未来数据的keys
- 按照分值查询zset, 判断数据是否到期
- 到期数据同步到list中
redis解决集群抢占
- 分布式锁: 控制分布式系统有序的去对共享资源进行操作, 通过互斥来保证数据的一致性
- 解决方案
方案 说明 数据库 基于表的唯一索引 zookeeper 根据zookeep中的临时有序节点排序 redis 使用setnx命令完成 - redis分布式锁: setnx(set if not exists)命令在指定的key不存在时, 为key设置指定的值
具体实现
乐观锁
// @Version注解
public class TaskinfoLogs implements Serializable {.../*** 版本号,用乐观锁*/@Versionprivate Integer version;...
}// Interceptor
@Bean
public MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;
}
docker运行redis
// 拉取镜像
docker pull redis// 创建运行容器
docker run -d --name redis -p 6379:6379 redis --requirepass "leadnews"
项目集成redis
- 依赖
<!--spring data redis & cache--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- redis依赖commons-pool 这个依赖一定要添加 --> <dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId> </dependency>
- 配置
spring:redis:host: 192.168.174.133password: leadnewsport: 6379
- 工具类
// 太长了, 从仓库获取// 配置spring.factories
- 测试类
@SpringBootTest(classes = ScheduleApplication.class) @RunWith(SpringRunner.class) public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList(){//在list的左边添加元素//cacheService.lLeftPush("list_001","hello,redis");//在list的右边获取元素,并删除String list_001 = cacheService.lRightPop("list_001");System.out.println(list_001);}@Testpublic void testZset(){//添加数据到zset中 分值cacheService.zAdd("zset_key_001","hello zset 001",1000);cacheService.zAdd("zset_key_001","hello zset 002",8888);cacheService.zAdd("zset_key_001","hello zset 003",7777);cacheService.zAdd("zset_key_001","hello zset 004",999999);//按照分值获取数据Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);} }
添加任务
// Service
@Service
@Transactional
public class TaskServiceImpl implements TaskService {@Overridepublic long addTask(Task task) {// 1. 添加任务到数据库中boolean success = addTaskToDb(task);// 2. 添加任务到redisif(success){addTaskToCache(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;private void addTaskToCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();// 获取5分钟后的时间, 毫秒值Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);long nextScheduleTime = calendar.getTimeInMillis();if(task.getExecuteTime() <= System.currentTimeMillis()){// 2.1 如果任务的执行时间小于等于当前时间, 存入listcacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}else if(task.getExecuteTime() <= nextScheduleTime){// 2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟), 存入zsetcacheService.zAdd(ScheduleConstants.FUTURE+key, JSON.toJSONString(task), task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;private boolean addTaskToDb(Task task) {boolean flag = false;try {// 保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task, taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);// 设置taskIdtask.setTaskId(taskinfo.getTaskId());// 保存任务日志数据TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo, taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (BeansException e) {e.printStackTrace();}return flag;}
}// Test
@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class TaskServiceImplTest {@Autowiredprivate TaskService taskService;@Testpublic void addTask() {Task task = new Task();task.setTaskType(100);task.setExecuteTime(new Date().getTime()+500000);task.setParameters("Hello Task!".getBytes());task.setPriority(50);long taskId = taskService.addTask(task);System.out.println(taskId);}
}
取消任务
// Service
public boolean cancelTask(Long taskId) {boolean flag = false;// 删除任务, 修改任务日志状态为2(取消)Task task = updateDb(taskId, ScheduleConstants.CANCELLED);// 删除redis中的数据if(task != null) {removeTaskFromCache(task);flag = true;}return flag;
}/*** 删除redis中的数据* @param task*/
private void removeTaskFromCache(Task task) {String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime() <= System.currentTimeMillis()){cacheService.lRemove(ScheduleConstants.TOPIC+key, 0, JSON.toJSONString(task));}else{cacheService.zRemove(ScheduleConstants.FUTURE+key, JSON.toJSONString(task));}
}/*** 删除任务, 更新任务日志* @param taskId* @param cancelled* @return*/
private Task updateDb(Long taskId, int cancelled) {Task task = null;try {// 删除任务taskinfoMapper.deleteById(taskId);// 更新任务日志TaskinfoLogs taskLog = taskinfoLogsMapper.selectById(taskId);taskLog.setStatus(cancelled);taskinfoLogsMapper.updateById(taskLog);task = new Task();BeanUtils.copyProperties(taskLog, task);task.setExecuteTime(taskLog.getExecuteTime().getTime());} catch (Exception e) {log.error("task cancel exception taskId={}", taskId);}return task;
}
拉取任务
public Task poll(Integer type, Integer priority) {Task task = null;try {String key = type+"_"+priority;// 从redis中拉取数据 popString task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);// 修改数据库信息updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);}} catch (Exception e) {e.printStackTrace();log.error("poll task exception");}return task;
}
未来数据定时刷新
@Scheduled(cron = "0 */1 * * * ?")
public void refresh()
{log.info("未来数据定时刷新---定时任务");// 获取所有未来数据的集合keySet<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");for (String futureKey : futureKeys) {// 获取当前数据的key topicString topiKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];// 按照key和分值查询符合条件的数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());// 同步数据if(!tasks.isEmpty()){cacheService.refreshWithPipeline(futureKey, topiKey, tasks);log.info("成功的将"+futureKey+"刷新到了"+topiKey);}}
}
redis解决集群抢占
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}// use
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000*30);
if(StringUtils.isNotBlank(token)) {...
}
数据库定时同步
@PostConstruct // 初始化方法
@Scheduled(cron = "0 */5 * * * ?")
public void reloadData()
{// 清理缓存中的数据clearCache();// 查询符合条件的任务 小于未来5分钟的数据Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);List<Taskinfo> taskinfoList = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));// 把任务添加到redisif(taskinfoList != null && taskinfoList.size() > 0){for (Taskinfo taskinfo : taskinfoList) {Task task = new Task();BeanUtils.copyProperties(taskinfo, task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToCache(task);}}log.info("数据库的数据同步到了redis");
}/*** 清楚缓存中的数据*/
public void clearCache()
{Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");Set<String> topicKeys = cacheService.scan(ScheduleConstants.TOPIC + "*");cacheService.delete(futureKeys);cacheService.delete(topicKeys);
}
远程接口
@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加任务*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(Task task){return ResponseResult.okResult(taskService.addTask(task));}/*** 取消任务*/@GetMapping("/api/v1/task/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") Long taskId){return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级拉取任务*/@GetMapping("/api/v1/task/{type}/{priority}")public ResponseResult poll(@PathVariable("type") Integer type, @PathVariable("priority") Integer priority){return ResponseResult.okResult(taskService.poll(type, priority));}
}
发布文章添加延迟任务
- 枚举类
@Getter @AllArgsConstructor public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息 }
- ProtoStuff依赖
<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version> </dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version> </dependency>
- 添加任务
@Override @Async public void addNewsToTask(Integer id, Date publishTime) {Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task); }
拉取任务
@Scheduled(fixedDelay = 1000)
@Override
public void scanNewsByTask() {log.info("消费任务, 审核文章");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200) && responseResult.getData() != null){Task task = JSON.parseObject(JSON.toJSONString(responseResult.getData()), Task.class);WmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);wmNewsAutoScanService.autoScanWmNews(wmNews.getId());}
}
总结
redis延迟服务
- 为什么任务需要存储在数据库中?
- 延迟任务是一个通用的服务,任何有延迟需求的任务都可以调用该服务, 内存数据库的存储是有限的,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑
- 为什么使用redis中的两种数据类型, list和zset?
- list存储立即执行的任务, zset存储未来的数据
- 任务量过大以后, zset的性能会下降, 将立即执行的任务存储在list中, 可以提高性能( list的lpush是O(1), zset的zadd是O(log(n)) )
- 添加zset数据时, 为什么需要预加载?
- 如果任务数据特别大, 为了防止阻塞, 只需要把未来几分钟要执行的数据放入缓存即可, 是一种优化的形式.
数据库
- 数据库准备
- mysql中, blob是一个二进制大型对象, 是一个可以存储大量数据的容器, longblob最大存储4G
- 数据库锁
- 悲观锁(Pessimistic Lock): 拿数据的时候认为别人会修改, 所以每次拿取数据都会上锁
- 乐观锁(Optimistic Lock): 拿数据的时候认为别人不会修改, 所以不会上锁, 但是在更新的时候会判断一下在此期间别人有没有去更新数据, 可以使用版本号等机制
未来数据定时刷新
redis key值匹配
-
keys模糊匹配
keys的模糊匹配功能很方便也很强大,但是在生产环境需要慎用! 开发中使用keys的模糊匹配却发现redis的CPU使用率极高,所以公司的redis生产环境将keys命令禁用了!redis是单线程,会被堵塞.
-
scan
SCAN 命令是一个基于游标的迭代器,SCAN命令每次被调用之后,都会向用户返回一个新的游标,用户在下次迭代时需要使用这个新游标作为SCAN命令的游标参数,以此来延续之前的迭代过程。
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews