【黑马头条】-day05延迟队列文章发布审核-Redis-zSet实现延迟队列-Feign远程调用


文章目录

  • 昨日回顾
  • 今日内容
  • 1 延迟任务
    • 1.1 概述
    • 1.2 技术对比
      • 1.2.1 DelayQueue
      • 1.2.2 RabbitMQ
      • 1.2.3 Redis实现
      • 1.2.4 总结
  • 2 redis实现延迟任务
    • 2.0 实现思路
    • 2.1 思考
    • 2.2 初步配置实现
      • 2.2.1 导入heima-leadnews-schedule模块
      • 2.2.2 在Nacos注册配置管理leadnews-schedule
      • 2.2.3 导入表结构
      • 2.2.4 根据表结构导入实体类及其mapper
      • 2.2.5 表结构中的乐观锁
        • 2.2.5.1 在启动类中加入乐观锁的拦截器
      • 2.2.6 安装redis
      • 2.2.7 在项目中集成redis
        • 2.2.7.1 导入redis依赖
        • 2.2.7.2 为redis添加连接配置
        • 2.2.7.3 拷贝工具类CacheService
        • 2.2.7.4 将CacheService注册到spring自动配置
        • 2.2.7.5 测试List
        • 2.2.7.6 测试Zset
    • 2.3 添加任务
      • 2.3.1 导入task类
      • 2.3.2 创建TaskService
      • 2.3.3 测试
    • 2.4 取消任务
      • 2.4.1 Service
      • 2.4.2 测试
    • 2.5 拉取任务
      • 2.5.1 Service
      • 2.5.2 测试
    • 2.6 定时刷新
      • 2.6.1 如何获取zset中所有的key?
      • 2.6.2 数据如何同步?
      • 2.6.3 Redis管道
      • 2.6.4 zSet和List数据同步实现
      • 2.6.5 开启定时任务
      • 2.6.6 分布式下的Schedule
      • 2.6.7 Redis分布式锁
      • 2.6.8 数据库和Redis的同步
    • 2.7 延迟队列对外接口
      • 2.7.1 IScheduleClinet接口
      • 2.7.2 在微服务中实现类
    • 2.8 发布文章集成延迟队列
      • 2.8.1 添加askTypeEnum类枚举类
      • 2.8.2 Task的参数序列化
      • 2.8.3 实现文章发布集成接口及实现类
      • 2.8.4 修改文章发布逻辑
      • 2.8.5 启动测试
    • 2.9 消费任务审核文章
      • 2.9.1 综合测试


昨日回顾

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

今日内容

在这里插入图片描述

1 延迟任务

1.1 概述

在这里插入图片描述

1.2 技术对比

1.2.1 DelayQueue

在这里插入图片描述

1.2.2 RabbitMQ

在这里插入图片描述

1.2.3 Redis实现

在这里插入图片描述

1.2.4 总结

在这里插入图片描述

2 redis实现延迟任务

2.0 实现思路

在这里插入图片描述

2.1 思考

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.2 初步配置实现

在这里插入图片描述

2.2.1 导入heima-leadnews-schedule模块

在这里插入图片描述

在这里插入图片描述

2.2.2 在Nacos注册配置管理leadnews-schedule

spring:redis:host: 192.168.204.129password: leadnewsport: 6379datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/leadnews_schedule?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=falseusername: rootpassword: 123sjbsjb# 设置Mapper接口所对应的XML文件位置,如果你在Mapper接口中有自定义方法,需要进行该配置
mybatis-plus:mapper-locations: classpath*:mapper/*.xml# 设置别名包扫描路径,通过该属性可以给包中的类注册别名type-aliases-package: com.heima.model.schedule.pojosminio:accessKey: miniosecretKey: minio123bucket: leadnewsendpoint: http://192.168.204.129:9000readPath: http://192.168.204.129:9000

在这里插入图片描述

2.2.3 导入表结构

在这里插入图片描述

在这里插入图片描述

2.2.4 根据表结构导入实体类及其mapper

导入heima-leadnews-model模块下com.heima.model.schedule下导入两个Taskinfo和TaskinfoLogs实体类

@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;}
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {private static final long serialVersionUID = 1L;/*** 任务id*/@TableId(type = IdType.ID_WORKER)private Long taskId;/*** 执行时间*/@TableField("execute_time")private Date executeTime;/*** 参数*/@TableField("parameters")private byte[] parameters;/*** 优先级*/@TableField("priority")private Integer priority;/*** 任务类型*/@TableField("task_type")private Integer taskType;/*** 版本号,用乐观锁*/@Versionprivate Integer version;/*** 状态 0=int 1=EXECUTED 2=CANCELLED*/@TableField("status")private Integer status;}

对应mapper

@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {}
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {public List<Taskinfo> queryFutureTime(@Param("taskType")int taskType, @Param("priority")int priority, @Param("future")Date future);
}

TaskinfoMapper对应的mybatis的xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.heima.schedule.mapper.TaskinfoMapper"><select id="queryFutureTime" resultType="com.heima.model.schedule.pojos.Taskinfo">select *from taskinfowhere task_type = #{taskType}and priority = #{priority}and execute_time <![CDATA[<]]> #{future,javaType=java.util.Date}</select></mapper>

2.2.5 表结构中的乐观锁

@Version
private Integer version;

在这里插入图片描述

在这里插入图片描述

2.2.5.1 在启动类中加入乐观锁的拦截器

在heima-leadnews-schedule模块下的启动类中加入乐观锁的拦截器

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}

2.2.6 安装redis

移除已有redis

docker rm redis

创建新的redis容器

docker run -d --name redis --restart=always -p 6379:6379 redis --requirepass "leadnews"

密码leadnews

在这里插入图片描述

2.2.7 在项目中集成redis

2.2.7.1 导入redis依赖

在heima-leadnews-common模块中添加redis依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- redis依赖commons-pool 这个依赖一定要添加 -->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
2.2.7.2 为redis添加连接配置

在nacos中的leadnews-schedule配置中心为redis添加配置

spring:redis:host: 192.168.204.129password: leadnewsport: 6379

在这里插入图片描述

2.2.7.3 拷贝工具类CacheService

拷贝工具类CacheService到heima-leadnews-common的com.heima.common.redis下

在这里插入图片描述

2.2.7.4 将CacheService注册到spring自动配置

在这里插入图片描述

2.2.7.5 测试List

在heima-leadnews-schedule中创建RedisTest测试类

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素cacheService.lLeftPush("list_001", "hello,redis");}
}

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
public class RedisTest {@Autowiredprivate CacheService cacheService;@Testpublic void testList() {//在List的左边添加元素//cacheService.lLeftPush("list_001", "hello,redis");//在List的右边获取元素并删除String value = cacheService.lRightPop("list_001");System.out.println(value);}
}

在这里插入图片描述

查看redis发现已经没有数据了

在这里插入图片描述

2.2.7.6 测试Zset
@Test
public void testZset() {//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_002", "hello zset 002", 8888);cacheService.zAdd("zset_key_003", "hello zset 003", 7777);cacheService.zAdd("zset_key_004", "hello zset 004", 99999);//按照分值获取元素
}

在这里插入图片描述

获取前三条数据

@Test
public void testZset() {/*//添加元素到Zset中,按照分值cacheService.zAdd("zset_key_001", "hello zset 001", 1000);cacheService.zAdd("zset_key_001", "hello zset 002", 8888);cacheService.zAdd("zset_key_001", "hello zset 003", 7777);cacheService.zAdd("zset_key_001", "hello zset 004", 99999);*///按照分值获取元素Set<String> zset_key_001 = cacheService.zRangeByScore("zset_key_001", 0, 8888);System.out.println(zset_key_001);
}

在这里插入图片描述

2.3 添加任务

在这里插入图片描述

2.3.1 导入task类

@Data
public class Task implements Serializable {/*** 任务id*/private Long taskId;/*** 类型*/private Integer taskType;/*** 优先级*/private Integer priority;/*** 执行id*/private long executeTime;/*** task参数*/private byte[] parameters;}

2.3.2 创建TaskService

在heima-leadnews-schedule模块下创建com.heima.schedule.service.TaskService接口及实现

public interface TaskService {/*** 添加延迟任务* @param task* @return*/long addTask(Task task);
}

实现包含

1.添加任务到数据库中

2.添加任务到redis中

2.1 如果任务的执行时间小于当前时间,直接执行任务

2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中

@Service
@Slf4j
public class TaskServiceImpl implements TaskService {/*** 添加延迟任务* @param task* @return*/@Overridepublic long addTask(Task task) {//1.添加任务到数据库中boolean success= addTaskToDB(task);//2.添加任务到redis中if(success){addTaskToRedis(task);}return task.getTaskId();}@Autowiredprivate CacheService cacheService;/*** 添加任务到redis中* @param task*/private void addTaskToRedis(Task task) {String key = task.getTaskType()+"_"+task.getPriority();//获取预设时间,5分钟后Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE,5);long nextSchedule = calendar.getTimeInMillis();//2.1 如果任务的执行时间小于当前时间,直接执行任务if(task.getExecuteTime()<=System.currentTimeMillis()){cacheService.lLeftPush(ScheduleConstants.TOPIC+key, JSON.toJSONString(task));}//2.2 如果任务的执行时间大于当前时间&&小于等于预设时间,添加到延迟队列中else if(task.getExecuteTime()>System.currentTimeMillis()&&task.getExecuteTime()<=nextSchedule){cacheService.zAdd(ScheduleConstants.FUTURE+key,JSON.toJSONString(task),task.getExecuteTime());}}@Autowiredprivate TaskinfoMapper taskinfoMapper;@Autowiredprivate TaskinfoLogsMapper taskinfoLogsMapper;/*** 添加任务到数据库中* @param task* @return*/private boolean addTaskToDB(Task task) {boolean flag = false;try {//1.保存任务表Taskinfo taskinfo = new Taskinfo();BeanUtils.copyProperties(task,taskinfo);taskinfo.setExecuteTime(new Date(task.getExecuteTime()));taskinfoMapper.insert(taskinfo);//设置Task的idtask.setTaskId(taskinfo.getTaskId());//2.保存任务日志表TaskinfoLogs taskinfoLogs = new TaskinfoLogs();BeanUtils.copyProperties(taskinfo,taskinfoLogs);taskinfoLogs.setVersion(1);taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);taskinfoLogsMapper.insert(taskinfoLogs);flag = true;} catch (Exception e) {log.error("添加任务到数据库失败",e);e.printStackTrace();}return flag;}
}

还有个常量类放入heima-leadnews-common模块下的com.heima.common.constant包下

package com.heima.common.constants;public class ScheduleConstants {//task状态public static final int SCHEDULED=0;   //初始化状态public static final int EXECUTED=1;       //已执行状态public static final int CANCELLED=2;   //已取消状态public static String FUTURE="future_";   //未来数据key前缀public static String TOPIC="topic_";     //当前数据key前缀
}

2.3.3 测试

public class TaskServiceImpl implements TaskService点击TaskService,CONTROL+SHIFT+T创建测试

在这里插入图片描述

@SpringBootTest(classes = ScheduleApplication.class)
@RunWith(SpringRunner.class)
@Slf4j
class TaskServiceImplTest {@Autowiredprivate TaskService taskService;@Testvoid addTask() {Task task = new Task();task.setTaskType(100);task.setPriority(50);task.setExecuteTime(new Date().getTime()+2000);task.setParameters("task test".getBytes());long taskId = taskService.addTask(task);log.info("taskId:{}", taskId);}
}

在这里插入图片描述

显示如此

2.4 取消任务

在这里插入图片描述

2.4.1 Service

boolean deleteTask(Long taskId);
/*** 删除任务* @param taskId* @return*/
@Override
public boolean deleteTask(Long taskId) {boolean flag = false;//1.删除数据库中的任务int success = taskinfoMapper.deleteById(taskId);if(success==0){return flag;}try {//2.更新日志状态TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);taskinfoLogsMapper.updateById(taskinfoLogs);//3.删除redis中的任务Task task = new Task();BeanUtils.copyProperties(taskinfoLogs,task);task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());String key = task.getTaskType()+"_"+task.getPriority();if(task.getExecuteTime()<=System.currentTimeMillis()) {cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));}else{cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));}flag = true;} catch (Exception e) {log.error("删除任务失败",e);e.printStackTrace();}return flag;
}

2.4.2 测试

@Test
void deleteTask() {boolean flag = taskService.deleteTask(1773909243989106689);log.info("flag:{}", flag);
}

在这里插入图片描述

2.5 拉取任务

在这里插入图片描述

2.5.1 Service

Task poll(int type, int priority);
/*** 按照类型和优先级拉取任务* @return*/
@Override
public Task poll(int type,int priority) {Task task = null;try {String key = type+"_"+priority;String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);if(StringUtils.isNotBlank(task_json)){task = JSON.parseObject(task_json, Task.class);//更新数据库信息updateDb(task.getTaskId(),ScheduleConstants.EXECUTED);}}catch (Exception e){e.printStackTrace();log.error("poll task exception");}return task;
}

2.5.2 测试

@Test
void testPoll() {Task task = taskService.poll(100, 50);log.info("task:{}", task);
}

拉取成功

在这里插入图片描述

2.6 定时刷新

在这里插入图片描述

在这里插入图片描述

2.6.1 如何获取zset中所有的key?

在这里插入图片描述

在这里插入图片描述

@Test
public void testKeys() {Set<String> keys = cacheService.keys(ScheduleConstants.FUTURE + "*");System.out.println("方式一:");System.out.println(keys);Set<String> scan = cacheService.scan(ScheduleConstants.FUTURE + "*");System.out.println("方式二:");System.out.println(scan);
}

2.6.2 数据如何同步?

在这里插入图片描述

2.6.3 Redis管道

在这里插入图片描述

在这里插入图片描述

//耗时6151
@Test
public  void testPiple1(){long start =System.currentTimeMillis();for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());cacheService.lLeftPush("1001_1", JSON.toJSONString(task));}System.out.println("耗时"+(System.currentTimeMillis()- start));
}@Test
public void testPiple2(){long start  = System.currentTimeMillis();//使用管道技术List<Object> objectList = cacheService.getstringRedisTemplate().executePipelined(new RedisCallback<Object>() {@Nullable@Overridepublic Object doInRedis(RedisConnection redisConnection) throws DataAccessException {for (int i = 0; i <10000 ; i++) {Task task = new Task();task.setTaskType(1001);task.setPriority(1);task.setExecuteTime(new Date().getTime());redisConnection.lPush("1001_1".getBytes(), JSON.toJSONString(task).getBytes());}return null;}});System.out.println("使用管道技术执行10000次自增操作共耗时:"+(System.currentTimeMillis()-start)+"毫秒");
}

使用管道技术执行10000次自增操作共耗时:2481毫秒

2.6.4 zSet和List数据同步实现

Cron表达式 @Scheduled(cron="0 */1 * * * ?")

在TaskService中添加方法

public void refresh()
/*** 定时刷新队列,每分钟刷新*/
@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}
}

新增测试方法

public void addTaskNew() {for (int i = 0; i < 5; i++) {Task task = new Task();task.setTaskType(100 + i);task.setPriority(50);task.setParameters("task test".getBytes());task.setExecuteTime(new Date().getTime() + 500 * i);long taskId = taskService.addTask(task);}
} 

2.6.5 开启定时任务

在启动类中添加@EnableScheduling

@SpringBootApplication
@MapperScan("com.heima.schedule.mapper")
@EnableScheduling
public class ScheduleApplication {public static void main(String[] args) {SpringApplication.run(ScheduleApplication.class,args);}@Beanpublic MybatisPlusInterceptor optimisticLockerInterceptor(){MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new OptimisticLockerInnerInterceptor());return interceptor;}
}

启动ScheduleApplication

未来任务已经刷新

在这里插入图片描述

2.6.6 分布式下的Schedule

再启动一个ScheduleApplication端口为51702

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.6.7 Redis分布式锁

在这里插入图片描述

在heima-leadnews-common的工具类com.heima.common.redis.CacheService中添加方法

/*** 加锁** @param name* @param expire* @return*/
public String tryLock(String name, long expire) {name = name + "_lock";String token = UUID.randomUUID().toString();RedisConnectionFactory factory = stringRedisTemplate.getConnectionFactory();RedisConnection conn = factory.getConnection();try {//参考redis命令://set key value [EX seconds] [PX milliseconds] [NX|XX]Boolean result = conn.set(name.getBytes(),token.getBytes(),Expiration.from(expire, TimeUnit.MILLISECONDS),RedisStringCommands.SetOption.SET_IF_ABSENT //NX);if (result != null && result)return token;} finally {RedisConnectionUtils.releaseConnection(conn, factory,false);}return null;
}

在定时刷新前加上锁操作

@Override
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);if (StringUtils.isBlank(token)) {log.info(System.currentTimeMillis() / 1000 + "执行了定时任务");// 获取所有未来数据集合的key值Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_*for (String futureKey : futureKeys) { // future_250_250String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];//获取该组key下当前需要消费的任务数据Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());if (!tasks.isEmpty()) {//将这些任务数据添加到消费者队列中cacheService.refreshWithPipeline(futureKey, topicKey, tasks);log.info("成功的将" + futureKey + "下的当前需要执行的任务数据刷新到" + topicKey + "下");}}}
}

在这里插入图片描述

在这里插入图片描述

2.6.8 数据库和Redis的同步

在这里插入图片描述

在com.heima.schedule.service.impl.TaskServiceImpl中添加新的reloadData方法,数据库任务定时同步到redis中

@PostConstruct是开机自动同步

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {clearCache();log.info("数据库数据同步到缓存");Calendar calendar = Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);//查看小于未来5分钟的所有任务List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime,calendar.getTime()));if(allTasks != null && allTasks.size() > 0){for (Taskinfo taskinfo : allTasks) {Task task = new Task();BeanUtils.copyProperties(taskinfo,task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToRedis(task);}}
}private void clearCache(){// 删除缓存中未来数据集合和当前消费者队列的所有keySet<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_cacheService.delete(futurekeys);cacheService.delete(topickeys);
}

在这里插入图片描述

删除redis中数据,重新启动服务

在这里插入图片描述

同步成功

2.7 延迟队列对外接口

在这里插入图片描述

2.7.1 IScheduleClinet接口

对外通过Fegin进行接口调用

在heima-leadnews-feign-api模块下创建com.heima.apis.schedule包

再创建接口IScheduleClinet接口,将com.heima.schedule.service.TaskService接口的东西复制过来

@FeignClient(value = "leadnews-schedule")
public interface IScheduleClient {/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")public ResponseResult addTask(@RequestBody Task task);/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")public ResponseResult cancelTask(@PathVariable("taskId") long taskId);/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/{type}/{priority}")public ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority);
}

2.7.2 在微服务中实现类

在heima-leadnews-schedule模块下创建com.heima.schedule.feign.ScheduleClient实现类(充当Controller)

@RestController
public class ScheduleClient implements IScheduleClient {@Autowiredprivate TaskService taskService;/*** 添加延迟任务* @param task* @return*/@PostMapping("/api/v1/task/add")@Overridepublic ResponseResult addTask(@RequestBody Task task){return ResponseResult.okResult(taskService.addTask(task));}/*** 删除任务* @param taskId* @return*/@GetMapping("/api/v1/task/{taskId}")@Overridepublic ResponseResult cancelTask(@PathVariable("taskId") long taskId){return ResponseResult.okResult(taskService.cancelTask(taskId));}/*** 按照类型和优先级拉取* @param type* @param priority* @return*/@GetMapping("/api/v1/task/{type}/{priority}")@Overridepublic ResponseResult poll(@PathVariable("type")int type, @PathVariable("priority")int priority){return ResponseResult.okResult(taskService.poll(type, priority));}
}

2.8 发布文章集成延迟队列

在这里插入图片描述

2.8.1 添加askTypeEnum类枚举类

定义枚举类com.heima.model.common.enums.TaskTypeEnum类

@Getter
@AllArgsConstructor
public enum TaskTypeEnum {NEWS_SCAN_TIME(1001, 1,"文章定时审核"),REMOTEERROR(1002, 2,"第三方接口调用失败,重试");private final int taskType; //对应具体业务private final int priority; //业务不同级别private final String desc; //描述信息
}

2.8.2 Task的参数序列化

Task的参数是一个二进制数据,所以需要序列化

引入序列化工具

在这里插入图片描述

导入两个工具类

在这里插入图片描述

导入依赖

<dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId><version>1.6.0</version>
</dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId><version>1.6.0</version>
</dependency>

2.8.3 实现文章发布集成接口及实现类

添加com.heima.wemedia.service.WmNewsTaskService接口

public interface WmNewsTaskService {/*** 添加文章自动发布任务* @param id 文章id* @param publishTime 发布时间*/public void addNewsToTask(Integer id, Date publishTime);
}

实现类com.heima.wemedia.service.impl.WmNewsTaskServiceImpl

@Service
@Slf4j
public class WmNewsTaskServiceImpl implements WmNewsTaskService {@Autowiredprivate IScheduleClient scheduleClient;@Overridepublic void addNewsToTask(Integer id, Date publishTime) {log.info("添加文章自动发布任务,文章id:{},发布时间:{}",id,publishTime);Task task = new Task();task.setExecuteTime(publishTime.getTime());task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());WmNews wmNews = new WmNews();wmNews.setId(id);task.setParameters(ProtostuffUtil.serialize(wmNews));scheduleClient.addTask(task);log.info("添加文章自动发布任务成功");}
}

2.8.4 修改文章发布逻辑

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

第五步审核时,把任务先放到队列中,放在队列中再通过拉取任务进行审核

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
@Autowired
private WmNewsTaskService wmNewsTaskService;
@Override
public ResponseResult submitNews(WmNewsDto wmNewsDto) {// 0.参数检查if(wmNewsDto == null||wmNewsDto.getContent()==null){return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//1. 保存或修改文章WmNews wmNews = new WmNews();BeanUtils.copyProperties(wmNewsDto,wmNews);//1.1 封面if(wmNewsDto.getImages()!=null&& wmNewsDto.getImages().size()>0){String imageStr = StringUtils.join(wmNewsDto.getImages(), ",");wmNews.setImages(imageStr);}//1.2 如果封面为自动-1,则需要手动设置封面规则if(wmNewsDto.getType().equals(WemediaConstants.WM_NEWS_TYPE_AUTO)){wmNews.setType(null);}saveOrUpdateWmNews(wmNews);//2.判断是否为草稿,如果为草稿结束当前方法if(wmNews.getStatus().equals(WmNews.Status.NORMAL.getCode())){return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}//3.不是草稿,保存文章内容与图片素材的关系//3.1 获取文章内容的图片素材List<String> imageList=extractUrlInfo(wmNewsDto.getContent());saveRelativeInfoForContent(imageList,wmNews.getId());//4.不是草稿,保存文章封面图片与图片素材的关系saveRelativeInfoForCover(wmNewsDto,wmNews,imageList);//5.审核文章//wmNewAutoScanService.autoScanMediaNews(wmNews.getId());//将文章id和发布时间添加到任务中wmNewsTaskService.addNewsToTask(wmNews.getId(),wmNewsDto.getPublishTime());return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}

2.8.5 启动测试

在这里插入图片描述

在这里插入图片描述

2.9 消费任务审核文章

修改com.heima.wemedia.service.impl.WmNewsServiceImpl逻辑

@Autowired
private WmNewAutoScanService wmNewAutoScanService;
/*** 消费任务,审核文章*/
@Override
@Async
@Scheduled(fixedRate = 1000)
public void scanNewsByTask() {log.info("开始执行文章自动审核任务");ResponseResult responseResult = scheduleClient.poll(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType(), TaskTypeEnum.NEWS_SCAN_TIME.getPriority());if(responseResult.getCode().equals(200)&&responseResult.getData()!=null){log.info("task:{}",responseResult.getData());String jsonTask = JSON.toJSONString(responseResult.getData());Task task = JSON.parseObject(jsonTask, Task.class);//逆序列化任务参数拿到idWmNews wmNews = ProtostuffUtil.deserialize(task.getParameters(), WmNews.class);wmNewAutoScanService.autoScanMediaNews(wmNews.getId());}
}

这个方法并不会被调用,只需要按照一定频率拉取任务

因此添加@Scheduled(fixedRate = 1000)1s中拉取一次

在这里插入图片描述

同时需要在WediaAppilcation启动类添加@EnableScheduling

@SpringBootApplication
@EnableDiscoveryClient
@MapperScan("com.heima.wemedia.mapper")
@EnableFeignClients(basePackages = "com.heima.apis")
@EnableAsync
@EnableScheduling
public class WemediaApplication {public static void main(String[] args) {SpringApplication.run(WemediaApplication.class,args);}@Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));return interceptor;}@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}
}

2.9.1 综合测试

发布一个即时任务

在这里插入图片描述

发布一个延迟任务

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

查看控制台
在这里插入图片描述

25分即将被消费

在这里插入图片描述

状态为1表示消费成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/596139.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Leetcode】1. 两数之和

文章目录 题目思路代码复杂度分析时间复杂度空间复杂度 结果总结 题目 题目链接&#x1f517; 给定一个整数数组 n u m s nums nums 和一个整数目标值 t a r g e t target target&#xff0c;请你在该数组中找出 和为目标值 t a r g e t target target 的那 两个 整数&…

序列超图的下一项推荐 笔记

1 Title Next-item Recommendation with Sequential Hypergraphs&#xff08;Jianling Wang、Kaize Ding、Liangjie Hong、Huan Liu、James Caverlee&#xff09;【SIGIR 2020】 2 Conclusion This study explores the dynamic meaning of items in realworld scenarios and p…

JAVA IO流学习

File类&#xff1a; File类是java.io包中很重要的一个类 File类的对象可以代表一个文件或者目录&#xff0c;可以修改文件大小、文件最后修改日期、文件名等 File对象不能操作文件的具体数据&#xff0c;即不能对文件进行读和写的操作 File的构造方法&#xff1a; File&…

并查集学习(836. 合并集合 + 837. 连通块中点的数量)

//得先加集合个数再合并&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 核心代码&#xff1a; int find(int x){//返回父节点if(x ! p[x]) {p[x] find(p[x]);//路径压缩 } //孩子不等于爸爸&#xff0c;就…

Pytorch转onnx

pytorch 转 onnx 模型需要函数 torch.onnx.export。 def export(model: Union[torch.nn.Module, torch.jit.ScriptModule, torch.jit.ScriptFunction],args: Union[Tuple[Any, ...], torch.Tensor],f: Union[str, io.BytesIO],export_params: bool True,verbose: bool False…

【QT+QGIS跨平台编译】056:【pdal_json_schema+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

点击查看专栏目录 文章目录 一、pdal_json_schema介绍二、pdal下载三、文件分析四、pro文件五、编译实践一、pdal_json_schema介绍 pdal_json_schema 是与 PDAL(Point Data Abstraction Library)相关的 JSON 模式文件。PDAL 是一个用于处理和分析点云数据的开源库。JSON 模式…

DHCP-PXE

Dynamic Host Configuration Protocol 动态主机配置协议 1.Selinux 调试为Permission 防火墙配置 搭建DHCP的主机必须有一个静态地址&#xff0c;提前配置好 安装DHCP软件 服务名为dhcpd DHCP地址分配四次会话&#xff0c; DISCOVERY发现 OFFER 提供 REQUEST 回应 A…

5G网络架构及技术(二):OFDM一

ToDo: 等把这些讲义看完后得单开一个文章整理思维导图   该部分由于内容比较重要&#xff0c;OFDM是5G物理层的基础&#xff0c;但学习时直接跳到5G OFDM去看它的那些参数设置感觉没什么意义&#xff0c;还得从发展的角度进行学习&#xff0c;先从最先用到OFDM的WiFi协议开始…

WCH恒沁单片机-CH32V307学习记录2----FreeRTOS移植

RISC-V 单片机 FreeRTOS 移植 前面用了 5 篇博客详细介绍了 FreeRTOS 在 ARM Cortex-M3 MCU 上是如何运行的。 FreeRTOS从代码层面进行原理分析系列 现在我直接用之前的 RISC-V MCU 开发板子&#xff08;CH32V307VCT6&#xff09;再次对 FreeRTOS 进行移植&#xff0c;其实也…

【C语言自定义类型之----结构体,联合体和枚举】

一.结构体 1.结构体类型的声明 srruct tag {nemer-list;//成员列表 }varible-list;//变量列表结构体在声明的时候&#xff0c;可以不完全声明。 例如&#xff1a;描述一个学生 struct stu {char name[20];//名字int age;//年龄char sex[20];//性别 };//分号不能省略2.结构体…

C语言实现快速排序算法

1. 什么是快速排序算法 快速排序的核心思想是通过分治法&#xff08;Divide and Conquer&#xff09;来实现排序。 算法的基本步骤是: 1. 选择一个基准值&#xff08;通常是数组中的某个元素&#xff09;&#xff0c;将数组分成两部分&#xff0c;使得左边的部分所有元素都小于…

Open CASCADE学习|在给定的TopoDS_Shape中查找与特定顶点 V 对应的TopoDS_Edge编号

enum TopAbs_ShapeEnum{TopAbs_COMPOUND,TopAbs_COMPSOLID,TopAbs_SOLID,TopAbs_SHELL,TopAbs_FACE,TopAbs_WIRE,TopAbs_EDGE,TopAbs_VERTEX,TopAbs_SHAPE}; 这段代码定义了一个名为 TopAbs_ShapeEnum 的枚举类型&#xff0c;它包含了表示不同几何形状类型的常量。这些常量通常…