1、课程发布
为了提高网站的速度需要将课程信息进行缓存,并且要将课程信息加入索引库方便搜索,下图显示了课程发布后课程信息的流转情况:
1、向内容管理数据库的课程发布表存储课程发布信息,更新课程基本信息表中发布状态为已发布。
2、向Redis存储课程缓存信息。
3、向Elasticsearch存储课程索引信息。
4、请求分布文件系统存储课程静态化页面(即html页面),实现快速浏览课程详情页面。
课程发布表的数据来源于课程预发布表,它们的结构基本一样,只是课程发布表中的状态是课程发布状态,如下图:
redis中的课程缓存信息是将课程发布表中的数据转为json进行存储。
elasticsearch中的课程索引信息是根据搜索需要将课程名称、课程介绍等信息进行索引存储。
MinIO中存储了课程的静态化页面文件(html网页),查看课程详情是通过文件系统去浏览课程详情页面。
2、什么是分布式事务
一次课程发布操作需要向数据库、redis、elasticsearch、MinIO写四份数据,这里存在分布式事务问题。
什么是分布式事务?
首先理解什么是本地事务?
平常我们在程序中通过spring去控制事务是利用数据库本身的事务特性来实现的,因此叫数据库事务,由于应用主要靠关系数据库来控制事务,此数据库只属于该应用,所以基于本应用自己的关系型数据库的事务又被称为本地事务。
本地事务具有ACID四大特性,数据库事务在实现时会将一次事务涉及的所有操作全部纳入到一个不可分割的执行单元,该执行单元中的所有操作 要么都成功,要么都失败,只要其中任一操作执行失败,都将导致整个事务的回滚。
理解了本地事务,什么是分布式事务?
现在的需求是课程发布操作后将数据写入数据库、redis、elasticsearch、MinIO四个地方,这四个地方已经不限制在一个数据库内,是由四个分散的服务去提供,与这四个服务去通信需要网络通信,而网络存在不可到达性,这种分布式系统环境下,通过与不同的服务进行网络通信去完成事务称之为分布式事务。
3、 什么是CAP理论
控制分布式事务首先需要理解CAP理论,什么是CAP理论?
CAP是 Consistency、Availability、Partition tolerance三个词语的缩写,分别表示一致性、可用性、分区容忍性。
使用下边的分布式系统结构 进行说明:
客户端经过网关访问用户服务的两个结点,一致性是指用户不管访问哪一个结点拿到的数据都是最新的,比如查询小明的信息,不能出现在数据没有改变的情况下两次查询结果不一样。
可用性是指任何时候查询用户信息都可以查询到结果,但不保证查询到最新的数据。
分区容忍性也叫分区容错性,当系统采用分布式架构时由于网络通信异常导致请求中断、消息丢失,但系统依然对外提供服务。
CAP理论要强调的是在分布式系统中这三点不可能全部满足,由于是分布式系统就要满足分区容忍性,因为服务之间难免出现网络异常,不能因为局部网络异常导致整个系统不可用。
满足P那么C和A不能同时满足:
比如我们添加一个用户小明的信息,该信息先添加到结点1中,再同步到结点2中,如下图:
如果要满足C一致性,必须等待小明的信息同步完成系统才可用(否则会出现请求到结点2时查询不到数据,违反了一致性),在信息同步过程中系统是不可用的,所以满足C的同时无法满足A。
如果要满足A可用性,要时刻保证系统可用就不用等待信息同步完成,此时系统的一致性无法满足。
所以在分布式系统中进行分布式事务控制,要么保证CP、要么保证AP。
4、什么是BASE理论?
BASE 是 Basically Available(基本可用)、Soft state(软状态)和 Eventually consistent (最终一致性)三个短语的缩写。
基本可用:当系统无法满足全部可用时保证核心服务可用即可,比如一个外卖系统,每到中午12点左右系统并发量很高,此时要保证下单流程涉及的服务可用,其它服务暂时不可用。
软状态:是指可以存在中间状态,比如:打印自己的社保统计情况,该操作不会立即出现结果,而是提示你打印中,请在XXX时间后查收。虽然出现了中间状态,但最终状态是正确的。
最终一致性:退款操作后没有及时到账,经过一定的时间后账户到账,舍弃强一致性,满足最终一致性。
5、分布式事务控制有哪些常用的技术方案?
实现CP就是要实现强一致性:
使用Seata框架基于AT模式实现
使用Seata框架基于TCC模式实现。
实现AP则要保证最终数据一致性:
使用消息队列通知的方式去实现,通知失败自动重试,达到最大失败次数需要人工处理;
使用任务调度的方案,启动任务调度将课程信息由数据库同步到elasticsearch、MinIO、redis中。
6、课程发布的事务控制方案
学习了这么多的理论,回到课程发布,执行课程发布操作后要向数据库、redis、elasticsearch、MinIO写四份数据,这个场景用哪种方案?
满足CP?
如果要满足CP就表示课程发布操作后向数据库、redis、elasticsearch、MinIO写四份数据,只要有一份写失败其它的全部回滚。
满足AP?
课程发布操作后,先更新数据库中的课程发布状态,更新后向redis、elasticsearch、MinIO写课程信息,只要在一定时间内最终向redis、elasticsearch、MinIO写数据成功即可。
目前我们已经有了任务调度的技术积累,这里选用任务调度的方案去实现分布式事务控制,课程发布满足AP即可。
下图是具体的技术方案:
1、在内容管理服务的数据库中添加一个消息表,消息表和课程发布表在同一个数据库。
2、点击课程发布通过本地事务向课程发布表写入课程发布信息,同时向消息表写课程发布的消息。通过数据库进行控制,只要课程发布表插入成功消息表也插入成功,消息表的数据就记录了某门课程发布的任务。
3、启动任务调度系统定时调度内容管理服务去定时扫描消息表的记录。
4、当扫描到课程发布的消息时即开始完成向redis、elasticsearch、MinIO同步数据的操作。
5、同步数据的任务完成后删除消息表记录。
时序图如下:
下图是课程发布操作的流程:
1、执行发布操作,内容管理服务存储课程发布表的同时向消息表添加一条“课程发布任务”。这里使用本地事务保证课程发布信息保存成功,同时消息表也保存成功。
2、任务调度服务定时调度内容管理服务扫描消息表,由于课程发布操作后向消息表插入一条课程发布任务,此时扫描到一条任务。
3、拿到任务开始执行任务,分别向redis、elasticsearch及文件系统存储数据。
4、任务完成后删除消息表记录。
7、课程发布接口开发
@Transactional@Overridepublic void publish(Long companyId, Long courseId) {//查询预发布表CoursePublishPre coursePublishPre = coursePublishPreMapper.selectById(courseId);if(coursePublishPre == null){XueChengPlusException.cast("课程没有审核记录,无法发布");}//状态String status = coursePublishPre.getStatus();//课程如果没有审核通过不允许发布if(!status.equals("202004")){XueChengPlusException.cast("课程没有审核通过不允许发布");}//向课程发布表写入数据CoursePublish coursePublish = new CoursePublish();BeanUtils.copyProperties(coursePublishPre,coursePublish);//先查询课程发布,如果有则更新,没有再添加CoursePublish coursePublishObj = coursePublishMapper.selectById(courseId);if(coursePublishObj == null){coursePublishMapper.insert(coursePublish);}else{coursePublishMapper.updateById(coursePublish);}//向消息表写入数据
// mqMessageService.addMessage("course_publish",String.valueOf(courseId),null,null);saveCoursePublishMessage(courseId);//将预发布表数据删除coursePublishPreMapper.deleteById(courseId);}
8、课程发布消息sdk
课程发布操作执行后需要扫描消息表的记录,有关消息表处理的有哪些?
上图中红色框内的都是与消息处理相关的操作:
1、新增消息表
2、扫描消息表。
3、更新消息表。
4、删除消息表。
使用消息表这种方式实现最终事务一致性的地方除了课程发布还有其它业务场景。
如果在每个地方都实现一套针对消息表定时扫描、处理的逻辑基本上都是重复的,软件的可复用性太低,成本太高。
如何解决这个问题
?
针对这个问题可以想到将消息处理相关的逻辑做成一个通用的东西。
是做成通用的服务,还是做成通用的代码组件呢?
通用的服务是完成一个通用的独立功能,并提供独立的网络接口,比如:项目中的文件系统服务,提供文件的分布式存储服务。
代码组件也是完成一个通用的独立功能,通常会提供API的方式供外部系统使用,比如:fastjson、Apache commons工具包等。
如果将消息处理做成一个通用的服务,该服务需要连接多个数据库,因为它要扫描微服务数据库下的消息表,并且要提供与微服务通信的网络接口,单就针对当前需求而言开发成本有点高。
如果将消息处理做一个SDK工具包相比通用服务不仅可以解决将消息处理通用化的需求,还可以降低成本。
所以,本项目确定将对消息表相关的处理做成一个SDK组件供各微服务使用,如下图所示:
下边对消息SDK的设计内容进行说明:
sdk需要提供执行任务的逻辑吗
?
拿课程发布任务举例,执行课程发布任务是要向redis、索引库等同步数据,其它任务的执行逻辑是不同的,所以执行任务在sdk中不用实现任务逻辑,只需要提供一个抽象方法由具体的执行任务方去实现。
如何保证任务的幂等性
?
在视频处理章节介绍的视频处理的幂等性方案,这里可以采用类似方案,任务执行完成后会从消息表删除,如果消息的状态是完成或不存在消息表中则不用执行。
如何保证任务不重复执行
?
采用和视频处理章节一致方案,除了保证任务的幂等性外,任务调度采用分片广播,根据分片参数去获取任务,另外阻塞调度策略为丢弃任务。
注意:这里是信息同步类任务,即使任务重复执行也没有关系,不再使用抢占任务的方式保证任务不重复执行。
还有一个问题,根据消息表记录是否存在或消息表中的任务状态去保证任务的幂等性,如果一个任务有好几个小任务,比如:课程发布任务需要执行三个同步操作:存储课程到redis、存储课程到索引库,存储课程页面到文件系统。如果其中一个小任务已经完成也不应该去重复执行。这里该如何设计?
将小任务作为任务的不同的阶段,在消息表中设计阶段状态。
9、课程发布任务调度
package com.xuecheng.content.service.jobhandler;import com.xuecheng.base.exception.XueChengPlusException;
import com.xuecheng.messagesdk.model.po.MqMessage;
import com.xuecheng.messagesdk.service.MessageProcessAbstract;
import com.xuecheng.messagesdk.service.MqMessageService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class CoursePublishTask extends MessageProcessAbstract {//任务调度入口@XxlJob("CoursePublishJobHandler")public void coursePublishJobHandler() throws Exception {// 分片参数int shardIndex = XxlJobHelper.getShardIndex();//执行器的序号,从0开始int shardTotal = XxlJobHelper.getShardTotal();//执行器总数//调用抽象类的方法执行任务process(shardIndex,shardTotal, "course_publish",30,60);}//执行课程发布任务的逻辑,如果此方法抛出异常说明任务执行失败@Overridepublic boolean execute(MqMessage mqMessage) {//从mqMessage拿到课程idLong courseId = Long.parseLong(mqMessage.getBusinessKey1());//课程静态化上传到miniogenerateCourseHtml(mqMessage,courseId);//向elasticsearch写索引数据saveCourseIndex(mqMessage,courseId);//向redis写缓存//返回true表示任务完成return true;}//生成课程静态化页面并上传至文件系统private void generateCourseHtml(MqMessage mqMessage,long courseId){//消息idLong taskId = mqMessage.getId();MqMessageService mqMessageService = this.getMqMessageService();//做任务幂等性处理//查询数据库取出该阶段执行状态int stageOne = mqMessageService.getStageOne(taskId);if(stageOne>0){log.debug("课程静态化任务完成,无需处理...");return ;}//开始进行课程静态化int i=1/0;//制造一个异常表示任务执行中有问题//..任务处理完成写任务状态为完成mqMessageService.completedStageOne(taskId);}//保存课程索引信息 第二个阶段任务private void saveCourseIndex(MqMessage mqMessage,long courseId){//任务idLong taskId = mqMessage.getId();MqMessageService mqMessageService = this.getMqMessageService();//取出第二个阶段状态int stageTwo = mqMessageService.getStageTwo(taskId);//任务幂等性处理if(stageTwo>0){log.debug("课程索引信息已写入,无需执行...");return;}//查询课程信息,调用搜索服务添加索引 ...//完成本阶段的任务mqMessageService.completedStageTwo(taskId);}
}