电商系统秒杀一 秒杀的各种解决方案以及存在的问题

一 业务场景介绍

1.1 正常电商流程

在这里插入图片描述

1.2 活动和场次关系

秒杀活动表:sms_flash_promotion

DROP TABLE IF EXISTS `sms_flash_promotion`;
CREATE TABLE `sms_flash_promotion`  (`id` bigint(20) NOT NULL AUTO_INCREMENT,`title` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '活动名称',`start_date` date NULL DEFAULT NULL COMMENT '开始日期',`end_date` date NULL DEFAULT NULL COMMENT '结束日期',`status` int(1) NULL DEFAULT NULL COMMENT '上下线状态,1上线、0下线',`create_time` datetime(0) NULL DEFAULT NULL COMMENT '秒杀时间段名称',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '限时购表' ROW_FORMAT = DYNAMIC;

秒杀场次表:sms_flash_promotion_session

DROP TABLE IF EXISTS `sms_flash_promotion_session`;
CREATE TABLE `sms_flash_promotion_session`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`name` varchar(200) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '场次名称',`start_time` time(0) NULL DEFAULT NULL COMMENT '每日开始时间',`end_time` time(0) NULL DEFAULT NULL COMMENT '每日结束时间',`status` int(1) NULL DEFAULT NULL COMMENT '启用状态:0->不启用;1->启用',`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '限时购场次表' ROW_FORMAT = DYNAMIC;

场次商品关系表:sms_flash_promotion_product_relation

DROP TABLE IF EXISTS `sms_flash_promotion_product_relation`;
CREATE TABLE `sms_flash_promotion_product_relation`  (`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',`flash_promotion_id` bigint(20) NULL DEFAULT NULL COMMENT '秒杀活动ID->关联sms_flash_promotion表',`flash_promotion_session_id` bigint(20) NULL DEFAULT NULL COMMENT '当前日期活动场次编号',`product_id` bigint(20) NULL DEFAULT NULL COMMENT '产品ID',`flash_promotion_price` decimal(10, 2) NULL DEFAULT NULL COMMENT '限时购价格',`flash_promotion_count` int(11) NULL DEFAULT NULL COMMENT '限时购数量',`flash_promotion_limit` int(11) NULL DEFAULT NULL COMMENT '每人限购数量',`sort` int(11) NULL DEFAULT NULL COMMENT '排序',PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 44 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '商品限时购与商品关系表' ROW_FORMAT = DYNAMIC;

一个活动可以有多个场次,每个场次可以有多个商品进行秒-杀。

二 秒杀系统设计

分两部分内容;秒杀业务设计和秒杀技术实现。

2.1 秒杀业务

秒杀业务的特性:
在这里插入图片描述
秒杀业务设计:
在这里插入图片描述
在这里插入图片描述
营销工具:系统整理的促销工具,可以对某些特定的工具详细解释。
营销活动:从营销工具中提出创建一个活动。
营销活动订单:针对营销活动产生的订单

商品级优惠:
限时促销(商品级)、
限时抢购(商品级)
秒杀(商品级)
商品包邮(商品级)

订单级优惠:
满就赠(订单级)
满立减(订单级)
送优惠券(订单级)
折扣(订单级)
Vip折扣(订单级)
订单包邮(订单级)

全站促销:
优惠券
优化券补发
银行促销
支付红包
团购预售
微信砍价

商品限时秒杀(商品级别)
是一款用于常规的营销活动,在限时促销上增加“排除参与活动”、“限制用户购买次数”、“限购种类”、“未付款取消时间”、“活动商品限制库存”等功能,是限时促销促销的增强版,常用于用户拉新、日常的秒杀、日常活动。促销渠道(app,pc,wap,global_app,fresh_app)等

订单满额减(订单级别)
常用促销工具,有满X元减Y元、满X件减Y元,支持叠加满减,订单商品满减金额,支持限制用户参与次数,可设置包括享受优惠的商品分类,商品品牌,商品、促销会员等级,会员标签,促销渠道(app,pc,wap,global_app,fresh_app),订单可享受满减的支付门槛金额等,如购买全场商品,订单满100元优惠20元

银行促销(全站)
常用促销工具,与银行合作在一段时间内每周固定几天进行优惠,可设置用户总参与次数,每天总活动次数,在用户进行支付时进行减免。当前只有光大银行每周二、周六有活动,参与渠道只有pc、h5端,支持排除部分商品,通常是虚拟商品

2.1 秒杀技术

秒杀技术特性:
在这里插入图片描述
单一职责:
秒杀流量是占比比较重的一环,所以要独立部署,与其他业务分开,互不影响。扩容容易。
防止超卖:
100个库存,1000个人购买,如何保证其中100个人能买到
限流、熔断、降级:
主要是防止程序蹦掉。核心就是限制次数、限制总量、快速失败、降级运行
队列削峰:
12306中选择购票时,选择自己靠窗座位时,所有下单请求,加入队列,满满匹配撮合。
流量错峰、防刷:
使用各种手段、将流量分担到更大宽度的时间点、比如验证码、F码
预热、快速扣减:
秒杀读多写少(访问商品人数往往大于购买人数)。活动和库存都可以提前预热。比如把
数据放到redis中。
动静分离:
nginx做好动静分离、使用CDN网络、分担后端的相应压力。

三 秒杀实战

核心问题: 一个是并发读,一个是并发写;
数据库(1.2章节里已经建好表了):
秒杀场次表:sms_flash_promotion_session
秒杀活动表:sms_flash_promotion
场次商品关系表:sms_flash_promotion_product_relation

3.1 下单流程

在这里插入图片描述
下单秒杀确认接口:

 @RequestMapping(value = "/miaosha/generateConfirmOrder",method = RequestMethod.POST)@ResponseBodypublic CommonResult generateMiaoShaConfirmOrder(@RequestParam("productId") Long productId,String token,@RequestHeader("memberId") Long memberId) throws BusinessException {return secKillOrderService.generateConfirmMiaoShaOrder(productId,memberId,token);}

3.2 确认下单流程

一、检查方法:confirmCheck
1、检查本地缓存售罄状态
2、校验是否有权限购买token
3、判断redis库存是否充足
4、 检查是否正在排队当中
二、调用会员服务获取会员信息
fegin远程调用
三、产品服务获取产品信息
四、验证秒杀时间是否超时
五、获取用户收获列表
六、构建商品信息
七、计算金额
八、会员积分

下单方式:0->同步下单。1->异步下单排队中。-1->秒杀失败。>1->秒杀成功(返回订单号)

流程:
1、检查方法:confirmCheck
2、 从产品服务获取产品信息
3、 验证秒杀时间是否超时
4、调用会员服务获取会员信息
5、通过Feign远程调用 会员地址服务
6、预减库存 ####(异步流程才需要这块,数据库锁不需要 此操作)
7、生成下单商品信息
8、库存处理 ####

confirmCheck方法如下

private CommonResult confirmCheck(Long productId, Long memberId, String token) throws BusinessException {/*1、设置标记,如果售罄了在本地cache中设置为true*/Boolean localcache = cache.getCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);if (localcache != null && localcache) {return CommonResult.failed("商品已经售罄,请购买其它商品!");}/**2、 校验是否有权限购买token TODO 楼兰*//*  String redisToken = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_TOKEN_PREFIX + memberId + ":" + productId);if(StringUtils.isEmpty(redisToken) || !redisToken.equals(token)){return CommonResult.failed("非法请求,token无效!");}*///3、从redis缓存当中取出当前要购买的商品库存,RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX:miaosha:stock:cache:Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");}String async = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId + ":" + productId);if (async != null && async.equals("1")) {Map<String, Object> result = new HashMap<>();result.put("orderStatus", "1");//下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败,>1->秒杀成功(返回订单号)return CommonResult.failed(result, "异步下单排队中");}return CommonResult.success(null);}

秒杀流程核心点为:
1、价格计算 2、库存处理

商品级别优惠计算:
在这里插入图片描述
订单级别计算优惠:
在这里插入图片描述

3.3 库存问题——超卖

高并发下会出现超卖问题。问题如下
在这里插入图片描述
线程一查询库存100个,然后进行扣减库存。
线程二查询库存也是100个,然后也进行扣减库存、
实际情况是:两个线程都扣减了库存,买了两件商品,但是库存只扣了一次,订单有两笔订单,但是库存只扣了一个。这就是库存超卖问题。

何时扣减库存:
1、下单时扣减
2、支付时扣减

3.4 库存解决

如何解决库存超卖问题,是我们秒杀非常重要的一个问题。
我们接下来会学习到用数据库的锁、用redis的特性、异步下单等解决方案来解决。

悲观锁操作:

begin;
select flash_promotion_count from sms_flash_promotion_product_relation where id=43 for UPDATE;
update sms_flash_promotion_product_relation set flash_promotion_count=flash_promotion_count-1 where id=43;
# ROLLBACK;
commit

for UPDATE:行级锁,使用不当的话会导致表级锁;sql语句不走索引的话就会把整张表锁住;

select…for update是MySQL提供的实现悲观锁的方式。此时在秒杀表中,id为43的那条数据就被我们锁定了,其它的要执行select * from 秒杀表 where id=43 for update的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

MySQL还有个问题是select…for update语句执行中所有扫描过的行都会被锁上,因此在MySQL中用悲观锁务必须确定走了索引,而不是全表扫描,否则将会将整个数据表锁住。

for update 悲观锁 行锁还有条件:就是要能查询到记录、并且走了索引才是行锁。某些情况可能是锁整张表。

因此悲观锁并不是适用于任何场景,它也存在一些不足,因为悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。如果加锁的时间过长,其他用户长时间无法访问,影响了程序的并发访问性,同时这样对数据库性能开销影响也很大,特别是对长事务而言,这样的开销往往无法承受,这时就需要乐观锁。

乐观锁操作:
乐观锁相对悲观锁而言,它认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则让返回错误信息,让用户决定如何去做。

版本号的实现方式有两种,一个是数据版本机制,一个是时间戳机制。
在这里插入图片描述

begin;
select flash_promotion_count from sms_flash_promotion_product_relation where id=43 ;
update sms_flash_promotion_product_relation set flash_promotion_count=flash_promotion_count ,version=version+1 where id=43 and version=#version#;
# ROLLBACK;
Commit

这样除了select查询库存,还需要更新库存,其实还有插入insert order orderlog orderdetail等需要插入数据库。库存更新没问题,但是插入订单时失败了是不是要回滚,如果不在一个事务就会出错。如果在一个事务,那又涉及到事务过长甚至可能是跨库然后无法用本地事务来解决。

上边的解决方式存在的问题汇总
有三个问题、性能问题、个数问题、架构问题。

1、性能问题:
无论是悲观锁还是乐观锁对需要对数据库进行上锁,而我们数据库的资源是非常有限的。

2、个数问题:

<!--扣减库存 防止库存超卖-->
<update id="descStock">UPDATE sms_flash_promotion_product_relationSET flash_promotion_count = CASEWHEN flash_promotion_count>=#{stock} THENflash_promotion_count - #{stock}ELSEflash_promotion_countENDWHEREid = #{id}
</update>

如果库存数量只有1个了,但是现在小明下单这时要买两个,那这条sql语句就有问题了,我们库存只有一个,很明显不够卖了吧。所以这里要判断下,库存数大于购买数才能购买

3、架构问题
1000个人来抢就意味着有1000个人来请求数据库尝试扣减库存。
假设我数据库只有10减商品,意味着990个请求是没有意义的。
那这样说的话这种架构有优化的空间吧;

3.5 Redis2.0版本解决库存问题

刚才我们看了用数据库的话性能相对来说是有很大瓶颈的,瓶颈在哪儿了?我们先抛开超卖的问题,我们回到整个业务的本质来说,秒杀的场景一般都是商品比较实惠的,而大众都有贪图便宜的这个心态,那商家为了吸引顾客会以比较少的商品来吸引比较多的顾客,就是顾客多商品少,那就意味着大部分人是买不到商品的,就好比库存只有10个,但是现在有100个人购买或者1000个人准备下单购买。但是里面只有10个人才能买到。这大量的请求数据库是受不了的。

正常下单
在这里插入图片描述
预下单:
根据这种情况我们可以把库存放到redis里面,秒杀下单时,先从redis里面获取库存数量,然后根据库存数量判断是否可以进行下一步,如果有库存就直接下单,如果没有库存就不能下单。这样做的好处是什么? 可以拦截大部分流量进入到数据库中,刚才我们说过了上述的业务场景问题,简称就是狼多肉少吧,这一步我们也叫下单流程中的预下单

//3、从redis缓存当中取出当前要购买的商品库存
Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);
if (stock == null || stock <= 0) {return CommonResult.failed("商品已经售罄,请购买其它商品!");
}

预售库存:
我们现在库存不从数据库里面扣减,而是从redis里面获取,那请问我们redis扣减库存这个数量从哪儿来的?

可以开一个定时任务,在开卖前几分钟或者几小时,把mysql里的数据同步到redis里;
在这里插入图片描述

//3、从redis缓存当中取出当前要购买的商品库存
Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);
if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");
}
/** 订单下单前的购买与检查*/
private CommonResult confirmCheck(Long productId, Long memberId, String token) throws BusinessException {/*1、设置标记,如果售罄了在本地cache中设置为true*/Boolean localcache = cache.getCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);if (localcache != null && localcache) {return CommonResult.failed("商品已经售罄,请购买其它商品!");}//3、从redis缓存当中取出当前要购买的商品库存Integer stock = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, Integer.class);if (stock == null || stock <= 0) {/*设置标记,如果售罄了在本地cache中设置为true*/cache.setLocalCache(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId, true);return CommonResult.failed("商品已经售罄,请购买其它商品!");}return CommonResult.success(null);}

存在的问题:
我们可以发现本地的缓存级别是jvm级别的,而各自的jvm售罄状态是不一样的,每个jvm只能修改自己本身的售罄状态,但是不能影响别的jvm状态。

3.6 解决方案

在这里插入图片描述

上边的这种情况存在问题,项目可能是集群部署,那就会导致各自的JVM获取到的售罄状态不一样,有如下几种解决方案

1、方案一 zookeeper
可以用zookeeper的watch机制来实现,让毎个jvm都监听zk的某个就节点,一旦数据有改变之后通知到其他节点上
在这里插入图片描述
原理:
在这里插入图片描述
一个jvm发现售罄时,就发消息到zk,此时监听zk的其他jvm就能感知到这个售罄的消息,这样就可以解决多个jvm售罄状态不同步的问题;

zk存在的问题:半数以上的机制,会导致延迟;
有点:高可用(也是因为半数以上机制)

2、方案二 redis
利用redis的channel机制实现(类似于消息中间件mq);

一个客户端订阅主题
在这里插入图片描述
订阅主题的命令:subscribe monkey

一个客户端向订阅的主题(channel)发送消息:
在这里插入图片描述
向主题发消息命令:publish monkey hello

此时其他订阅这个主题的客户端都能收到这个消息;

//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}

监听类:
监听到卖完后,就删除缓存;其实最好是改变状态(改变值),而不是清除,清除缓存的话,就会有歧义——是卖完了还是没有卖?


import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.lang.Nullable;import java.nio.charset.StandardCharsets;@Slf4j
public class RedisChannelListener implements MessageListener {@Autowiredprivate LocalCache localCache;@Overridepublic void onMessage(Message message, @Nullable byte[] pattern) {log.info("sub message :) channel[cleanNoStockCache] !");String productId = new String(message.getBody(), StandardCharsets.UTF_8);localCache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);}
}

redis这种发布与订阅是没有ack的,发出去了不会管有没有收到,这是它的不足(可靠性弱);那优点是什么?优点就是其缺点,吞吐量相当来说就会提高,因为减少了通讯(磁盘IO),那处理数据的能力就就会上升;

3、方案三 mq等其他方式
利用消息队列broker也能解决这个问题,缺点是数据要刷到磁盘,性能较低,这里不细述了;

3.7 秒杀商品的预热【product】

在项目启动的时候就把秒杀商品的库存放到redis中

import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tuling.tulingmall.component.RedisChannelListener;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Slf4j
@Configuration
public class RedisConifg {@Autowiredprivate RedisConnectionFactory connectionFactory;@Bean@Primarypublic RedisTemplate<String,Object> redisTemplate(){RedisTemplate<String,Object> template = new RedisTemplate();template.setConnectionFactory(connectionFactory);// 序列化工具Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);ObjectMapper om = new ObjectMapper();om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);jackson2JsonRedisSerializer.setObjectMapper(om);StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();template.setKeySerializer(stringRedisSerializer);template.setValueSerializer(jackson2JsonRedisSerializer);template.setHashKeySerializer(jackson2JsonRedisSerializer);template.setHashValueSerializer(jackson2JsonRedisSerializer);template.afterPropertiesSet();return template;}@Beanpublic RedisOpsUtil redisOpsUtil(){return new RedisOpsUtil();}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(){RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(messageListenerAdapter(),channelTopic());return container;}@BeanMessageListenerAdapter messageListenerAdapter(){return new MessageListenerAdapter(redisChannelListener());}@BeanRedisChannelListener redisChannelListener(){return new RedisChannelListener();}@BeanChannelTopic channelTopic(){return new ChannelTopic("cleanNoStockCache");}
}

四 异步下单

之前redis方案不足: 生产环境中,数据库需要insert 很多表,所以数据库需要优化,优化思路如下
1、颠覆性 :mysql 换为oracle
2、改进型:mysql添加索引、做分库分表、读写分离等;

以上两种思路,这里暂时不讲,这里使用消息中间件来继续优化;上边已经说过了,下单时直接插入数据库是有问题的(需要insert很多表,有性能问题,而且瞬时流量太大),那么此时可以采用MQ中间件来解决,即下单时先把数据放在MQ里,再通过MQ异步插入数据库;

消息中间件三大特性:异步、解耦、削峰填谷;
在这里插入图片描述
redis里的值(下单方式):
0:同步下单
1:异步下单排队中
-1:秒杀失败
大于1:返回订单号,跳转支付页

异步下单

/*** 秒杀订单下单** @param orderParam* @param memberId* @return*/@Override//@Transactional TODO 如果是数据库控制 防止超卖。Transactional有意义吗?public CommonResult<Map<String, Object>> generateSecKillOrder(OrderParam orderParam, Long memberId, String token) throws BusinessException {Long productId = orderParam.getItemIds().get(0);CommonResult commonResult = confirmCheck(productId, memberId, token);if (commonResult.getCode() == 500) {return commonResult;}//【2】 从产品服务获取产品信息PmsProductParam product = getProductInfo(productId);//【3】 验证秒杀时间是否超时if (!volidateMiaoShaTime(product)) {return CommonResult.failed("秒杀活动未开始或已结束!");}//【4】 调用会员服务获取会员信息UmsMember member = umsMemberFeignApi.getMemberById().getData();//【5】 通过Feign远程调用 会员地址服务UmsMemberReceiveAddress address = umsMemberFeignApi.getItem(orderParam.getMemberReceiveAddressId()).getData();//预减库存if (!preDecrRedisStock(productId, product.getFlashPromotionRelationId())) {return CommonResult.failed("下单失败,已经抢购完了");}//准备创建订单//生成下单商品信息OmsOrderItem orderItem = new OmsOrderItem();orderItem.setProductId(product.getId());orderItem.setProductName(product.getName());orderItem.setProductPic(product.getPic());orderItem.setProductBrand(product.getBrandName());orderItem.setProductSn(product.getProductSn());orderItem.setProductPrice(product.getFlashPromotionPrice());orderItem.setProductQuantity(1);orderItem.setProductCategoryId(product.getProductCategoryId());orderItem.setPromotionAmount(product.getPrice().subtract(product.getFlashPromotionPrice()));orderItem.setPromotionName("秒杀特惠活动");orderItem.setGiftIntegration(product.getGiftPoint());orderItem.setGiftGrowth(product.getGiftGrowth());orderItem.setCouponAmount(new BigDecimal(0));orderItem.setIntegrationAmount(new BigDecimal(0));orderItem.setPromotionAmount(new BigDecimal(0));//支付金额BigDecimal payAmount = product.getFlashPromotionPrice().multiply(new BigDecimal(1));//优惠价格orderItem.setRealAmount(payAmount);OmsOrder order = new OmsOrder();order.setDiscountAmount(product.getPrice().subtract(product.getFlashPromotionPrice()));//折扣金额order.setFreightAmount(new BigDecimal(0));//运费金额order.setPromotionAmount(new BigDecimal(0));order.setPromotionInfo("秒杀特惠活动");order.setTotalAmount(payAmount);order.setIntegration(0);order.setIntegrationAmount(new BigDecimal(0));order.setPayAmount(payAmount);order.setMemberId(memberId);order.setMemberUsername(member.getUsername());order.setCreateTime(new Date());//设置支付方式:0->未支付,1->支付宝,2->微信order.setPayType(orderParam.getPayType());//设置支付方式:0->PC订单,1->APP订单,2->小程序order.setSourceType(0);//订单状态:0->待付款;1->待发货;2->已发货;3->已完成;4->已关闭;5->无效订单order.setStatus(0);//订单类型:0->正常订单;1->秒杀订单order.setOrderType(1);//用户收货信息order.setReceiverName(address.getName());order.setReceiverPhone(address.getPhoneNumber());order.setReceiverPostCode(address.getPostCode());order.setReceiverProvince(address.getProvince());order.setReceiverCity(address.getCity());order.setReceiverRegion(address.getRegion());order.setReceiverDetailAddress(address.getDetailAddress());//0->未确认;1->已确认order.setConfirmStatus(0);order.setDeleteStatus(0);//计算赠送积分order.setIntegration(product.getGiftPoint());//计算赠送成长值order.setGrowth(product.getGiftGrowth());//生成订单号-理论上唯一// order.setOrderSn(generateOrderSn(order));/*----------------------------------基本方案(下单时直接插入数据库的方案,已注释掉,采用后边的消息中间件的方式)---------------------------------------*//*try {//【悲观锁】Integer dbStock = miaoShaStockDao.selectMiaoShaStockInLock(product.getFlashPromotionRelationId());if(dbStock <= 0){return CommonResult.failed("商品已抢完!");}miaoShaStockDao.descStockInLock(product.getFlashPromotionRelationId(),dbStock-1);//【乐观锁】减库存,DB乐观锁减库存实现Integer dbStock = miaoShaStockDao.selectMiaoShaStock(product.getFlashPromotionRelationId());if(dbStock <= 0){return CommonResult.failed("商品已抢完!");}Integer id = miaoShaStockDao.descStockInVersion(product.getFlashPromotionRelationId(),dbStock,dbStock-1);if(id <= 0){return CommonResult.failed("没抢到!再接再厉!");}int resultDb = miaoShaStockDao.descStock(product.getFlashPromotionRelationId(),1);if(resultDb > 0 ){//插入订单记录orderMapper.insertSelective(order);//OrderItem关联orderItem.setOrderId(order.getId());orderItem.setOrderSn(order.getOrderSn());//插入orderItemorderItemMapper.insertSelective(orderItem);}else{return CommonResult.failed();}} catch (Exception e) {log.error("create order failure:)",e.getMessage(),e.getCause());//补回已经减掉的库存!incrRedisStock(productId);//通知服务群,清除本地售罄标记缓存redisOpsUtil.publish("cleanNoStockCache",productId);throw new BusinessException("创建订单失败!");}List<OmsOrderItem> itemList = new ArrayList<>();itemList.add(orderItem);Map<String,Object> result = new HashMap<>();result.put("order",order);result.put("orderItem",itemList);0//下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败result.put("orderStatus","0");*//*******************************异步下单******************************************/OrderMessage orderMessage = new OrderMessage();orderMessage.setOrder(order);orderMessage.setOrderItem(orderItem);orderMessage.setFlashPromotionRelationId(product.getFlashPromotionRelationId());orderMessage.setFlashPromotionLimit(product.getFlashPromotionLimit());orderMessage.setFlashPromotionEndDate(product.getFlashPromotionEndDate());Map<String, Object> result = new HashMap<>();List<OmsOrderItem> itemList = new ArrayList<>();itemList.add(orderItem);result.put("order", order);result.put("orderItemList", itemList);try {//发送消息到MQboolean sendStatus = orderMessageSender.sendCreateOrderMsg(orderMessage);if (sendStatus) {/** 打上排队的标记,1:排队中*/redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId + ":" + productId, Integer.toString(1), 60, TimeUnit.SECONDS);/** 下单方式0->同步下单,1->异步下单排队中,-1->秒杀失败*/result.put("orderStatus", 1);} else {/** 还原预减库存*/incrRedisStock(productId);/** 清除掉本地guavacache已经售完的标记*/cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}result.put("orderStatus", -1);return CommonResult.failed(result, "下单失败");}} catch (Exception e) {log.error("消息发送失败:error msg:{}", e.getMessage(), e.getCause());/** 还原预减库存*/incrRedisStock(productId);/** 清除掉本地guavacache已经售完的标记*/cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if (shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}result.put("orderStatus", -1);return CommonResult.failed(result, "下单失败");}return CommonResult.success(result, "下单中.....");}

如果是异步下单的话,则需要定时任务来查询订单状态(前端发起的请求):

@ApiOperation("根据购物车信息生成订单")
@GetMapping("/miaosha/result")
@ResponseBody
public CommonResult miaoShaResult(@RequestParam("productId") Long productId,@RequestHeader("memberId") Long memberId){String status = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId);if(ObjectUtils.isEmpty(status)){return CommonResult.success(null,"无正在秒杀中的订单!");}if(status.equals("-1")){return CommonResult.success(status,"秒杀失败!");}if(status.equals("1")){return CommonResult.success(status,"正在排队中,请耐心等待!");}//如果Status>1,则秒杀成功,返回订单编号return CommonResult.success(status);
}

4.1 项目里涉及到的消息队列里的topic

4.1.1 处理未支付的订单(延迟消息来处理,20分钟,第五章节里还会细讲)

利用rocketmq延迟消息的一个特性来解决“定时任务”来取消订单操作。即利用MQ的延迟消息功能,消息20分钟没有被消费(没支付)的话,就把消息回滚到broker里;

Topic名字:order-status-check

生产端:

public boolean sendTimeOutOrderMessage(String cancelId){Message message = MessageBuilder.withPayload(cancelId).setHeader(RocketMQHeaders.KEYS, cancelId).build();SendResult result = rocketMQTemplate.syncSend(scheduleTopic+":"+TAG,message,5000,15);return SendStatus.SEND_OK == result.getSendStatus();
}

消费端:


import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}
}

取消订单逻辑

    @Overridepublic void cancelOrder(Long orderId,Long memberId) {//查询为付款的取消订单OmsOrderExample example = new OmsOrderExample();example.createCriteria().andIdEqualTo(orderId).andStatusEqualTo(0).andDeleteStatusEqualTo(0);List<OmsOrder> cancelOrderList = orderMapper.selectByExample(example);if (CollectionUtils.isEmpty(cancelOrderList)) {return;}OmsOrder cancelOrder = cancelOrderList.get(0);if (cancelOrder != null) {//修改订单状态为取消cancelOrder.setStatus(4);orderMapper.updateByPrimaryKeySelective(cancelOrder);OmsOrderItemExample orderItemExample = new OmsOrderItemExample();orderItemExample.createCriteria().andOrderIdEqualTo(orderId);List<OmsOrderItem> orderItemList = orderItemMapper.selectByExample(orderItemExample);//解除订单商品库存锁定if (!CollectionUtils.isEmpty(orderItemList)) {portalOrderDao.releaseSkuStockLock(orderItemList);}//修改优惠券使用状态updateCouponStatus(cancelOrder.getCouponId(), cancelOrder.getMemberId(), 0);//返还使用积分if (cancelOrder.getUseIntegration() != null) {//todo 这里需要做分布式事务UmsMember umsMember = umsMemberFeignApi.getMemberById().getData();umsMember.setIntegration(umsMember.getIntegration()+cancelOrder.getUseIntegration());CommonResult<String> result= umsMemberFeignApi.updateUmsMember(umsMember);if(result.getCode() == ResultCode.FAILED.getCode()) {log.warn("远程调用会员服务扣除用户积分异常");throw new RuntimeException("远程调用会员服务扣除用户积分异常");}}}}

4.1.2 Canal同步topic

Topic名字:productDetailChange

生产端:canal

消费端:


import com.alibaba.otter.canal.protocol.FlatMessage;
import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.domain.PmsProductParam;
import com.tuling.tulingmall.util.ClassUtil;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @description: 产品表信息修改-同步更新到Redis-Cache,ELK,Hadoop,相关下游服务**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.canal.topic}",consumerGroup = "${rocketmq.canal.group}")
public class RefreshCacheListener implements RocketMQListener<FlatMessage> {@Autowiredprivate RedisOpsUtil redisOpsUtil;private final static String PRODUCT = "pms_product";private final static String SKU = "pms_sku_stock";/*** 异步下单消费消息* @param flatMessage*/@Overridepublic void onMessage(FlatMessage flatMessage) {log.info("database:{},event-type:{},old-row-data:{},new-row-data:{}",flatMessage.getDatabase(),flatMessage.getType(),flatMessage.getOld(),flatMessage.getData());//修改后的新记录List<Map<String, String>> records = flatMessage.getData();//修改前的数据List<Map<String, String>> old = flatMessage.getOld();switch (flatMessage.getType().toUpperCase()){case "UPDATE":updateCache(records,old,flatMessage.getTable());break;case "DELETE":records.stream().forEach((item)->{//删除缓存redisOpsUtil.delete(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + item.get("id"));});break;}}public void updateCache(List<Map<String, String>> records,List<Map<String, String>> old,String table){int index = 0;/** 被更改的Row所有更改的行*/for(Map<String,String> row : old){Map<String, String> currentRow = records.get(index);String redisKey = RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + getProductId(currentRow,table);PmsProductParam product = redisOpsUtil.get(redisKey,PmsProductParam.class);if(!ObjectUtils.isEmpty(product)){Iterator<Map.Entry<String, String>> iterator = row.entrySet().iterator();while(iterator.hasNext()){Map.Entry entry = iterator.next();String key = (String) entry.getKey();//刷新产品数据product = refresh(product,table,key,currentRow);}/** 更新缓存内容,并设置过期时间*/long expired = redisOpsUtil.getExpired(redisKey, TimeUnit.SECONDS);redisOpsUtil.set(redisKey,product,expired,TimeUnit.SECONDS);}++index;}}/*** 更新缓存数据* @param product* @param table* @param key* @param currentRow* @return*/private PmsProductParam refresh(PmsProductParam product,String table,String key,Map<String, String> currentRow){if(PRODUCT.equals(table)){ClassUtil.callSetterMethod(product,ClassUtil.getSetterMethodName(key),currentRow.get(key));}else if(SKU.equals(table)){product.getSkuStockList().stream().forEach((item)->{if(item.getId() == Long.parseLong(currentRow.get("id"))){ClassUtil.callSetterMethod(item,ClassUtil.getSetterMethodName(key),currentRow.get(key));}});}return product;}/** 获取产品ID*/private String getProductId(Map<String, String> row,String table){if(PRODUCT.equals(table)){return row.get("id");}else{return row.get("product_id");}}}

4.1.3 async-order异步下单topic:

实际生产订单(发送端):SecKillOrderServiceImpl#asyncCreateOrder

@Transactional//这里可以使用分布式事物来优化public Long asyncCreateOrder(OmsOrder order, OmsOrderItem orderItem, Long flashPromotionRelationId) {//减库存Integer result = miaoShaStockDao.descStock(flashPromotionRelationId, 1);if (result <= 0) {throw new RuntimeException("没抢到!");}//插入订单记录orderMapper.insertSelective(order);//OrderItem关联orderItem.setOrderId(order.getId());orderItem.setOrderSn(order.getOrderSn());//插入orderItemorderItemMapper.insertSelective(orderItem);/** 如果订单创建成功,需要发送定时消息,20min后如果没有支付,则取消当前订单,释放库存*/try {boolean sendStatus = orderMessageSender.sendTimeOutOrderMessage(order.getId() + ":" + flashPromotionRelationId + ":" + orderItem.getProductId());if (!sendStatus) {throw new RuntimeException("订单超时取消消息发送失败!");}} catch (Exception e) {throw new RuntimeException("订单超时取消消息发送失败!");}return order.getId();}

处理订单(消费端):com.tuling.tulingmall.component.rocketmq.AscynCreateOrderReciever


import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.component.LocalCache;
import com.tuling.tulingmall.domain.OrderMessage;
import com.tuling.tulingmall.service.SecKillOrderService;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
import java.util.concurrent.TimeUnit;/*** @description: 消费监听rocketmq-订单消息**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.tulingmall.asyncOrderTopic}",consumerGroup = "${rocketmq.tulingmall.asyncOrderGroup}")
public class AscynCreateOrderReciever implements RocketMQListener<OrderMessage> {@Autowiredprivate SecKillOrderService secKillOrderService;@Autowiredprivate RedisOpsUtil redisOpsUtil;@Autowiredprivate LocalCache<Object> cache;/*** 异步下单消费消息* @param orderMessage*/@Overridepublic void onMessage(OrderMessage orderMessage) {log.info("listen the rocketmq message");Long memberId = orderMessage.getOrder().getMemberId();Long productId = orderMessage.getOrderItem().getProductId();//订单编号,分库分表不用该编号做订单结果标记String orderSn = orderMessage.getOrder().getOrderSn();Integer limit = orderMessage.getFlashPromotionLimit();Date endDate = orderMessage.getFlashPromotionEndDate();try {Long orderId = secKillOrderService.asyncCreateOrder(orderMessage.getOrder(),orderMessage.getOrderItem(),orderMessage.getFlashPromotionRelationId());//更改排队标记状态,代表已经下单成功,ID设置为snowflake后,用ID作为状态标记redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId,orderId.toString(),60L, TimeUnit.SECONDS);/** 设置用户购买次数,(不限制购买次数了,需要可自行放开此处,* 并在secKillOrderService.checkConfirm中加入验证)*//*Integer rebuy = redisOpsUtil.get(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId,Integer.class);if(rebuy != null){redisOpsUtil.decr(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId);}else{//剩余时间Date now = new Date();Long expired = endDate.getTime()-now.getTime();//打上购买次数标记redisOpsUtil.set(RedisKeyPrefixConst.MEMBER_BUYED_MIAOSHA_PREFIX + memberId + ":" + productId,limit-1,expired,TimeUnit.MILLISECONDS);}*/} catch (Exception e) {log.error(e.getMessage(),e.getCause());/** 下单失败*/redisOpsUtil.set(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId,Integer.toString(-1),60L, TimeUnit.SECONDS);//还原预减库存secKillOrderService.incrRedisStock(productId);//清除掉本地guava-cache已经售完的标记cache.remove(RedisKeyPrefixConst.MIAOSHA_STOCK_CACHE_PREFIX + productId);//通知服务群,清除本地售罄标记缓存if(secKillOrderService.shouldPublishCleanMsg(productId)) {redisOpsUtil.publish("cleanNoStockCache", productId);}}}}

目前发送端与消费端在一个项目里,实际部署时不能这么玩,需要拆分;

4.2 异步订单查询接口

如果是异步下单的话,则需要定时任务来查询订单状态;

@ApiOperation("根据购物车信息生成订单")
@GetMapping("/miaosha/result")
@ResponseBody
public CommonResult miaoShaResult(@RequestParam("productId") Long productId,@RequestHeader("memberId") Long memberId){String status = redisOpsUtil.get(RedisKeyPrefixConst.MIAOSHA_ASYNC_WAITING_PREFIX + memberId+ ":" + productId);if(ObjectUtils.isEmpty(status)){return CommonResult.success(null,"无正在秒杀中的订单!");}if(status.equals("-1")){return CommonResult.success(status,"秒杀失败!");}if(status.equals("1")){return CommonResult.success(status,"正在排队中,请耐心等待!");}//如果Status>1,则秒杀成功,返回订单编号return CommonResult.success(status);
}

总结:

  1. 异步下单可以分流、让服务器处理的压力变小、数据库压力减少(处理库存与处理订单的业务分开)
  2. 解耦的话,业务更加清晰。
  3. 天然的排队处理能力。
  4. 消息中间件有很多特性可以利用,比如订单取消。

五 订单取消

订单超时取消,回滚库存:
com.tuling.tulingmall.component.rocketmq.RocketMqCancelOrderReciever

import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}
}

定时任务处理取消订单存在的问题:
1、11点启动定时任务,毎半个小时扫描数据库一次,我们发现在11:01分下的单并不能30分钟之后失效,而是要到12点也就是定时任务第三次扫扫描数据库才能让订单失效。
2、定时扫数据库的话消耗性能也很大,自然效率也会很低。对数据库压力太大。
3、定时任务的话,集群还需要保证处理的幂等性和分布式问题。这也给系统带来了很多的负担。

在这里插入图片描述
下边这种方式,并发的时候,不安全
在这里插入图片描述
加锁:每个定时任务先去拿锁,性能不好
在这里插入图片描述
异步取消订单:
com.tuling.tulingmall.component.rocketmq.RocketMqCancelOrderReciever


import com.tuling.tulingmall.service.OmsPortalOrderService;
import com.tuling.tulingmall.service.SecKillOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;/*** 消费监听rocketmq-订单超时消息* @author yangguo*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.tulingmall.cancelGroup}", topic = "${rocketmq.tulingmall.scheduleTopic}")
public class RocketMqCancelOrderReciever implements RocketMQListener<String> {@Autowiredprivate OmsPortalOrderService omsPortalOrderService;@Autowiredprivate SecKillOrderService secKillOrderService;/*** 延时消息,取消超时订单* @param cancelId*/@Overridepublic void onMessage(String cancelId) {if(StringUtils.isEmpty(cancelId)){return;}Long orderId = Long.parseLong(cancelId.split(":")[0]);Long promotionId = Long.parseLong(cancelId.split(":")[1]);Long productId = Long.parseLong(cancelId.split(":")[2]);try {//取消的订单,释放DB库存omsPortalOrderService.cancelOrder(orderId,promotionId);//取消的订单-还原缓存库存secKillOrderService.incrRedisStock(productId);} catch (Exception e) {log.error("订单取消异常 : 还原库存失败,please check:{}",e.getMessage(),e.getCause());throw new RuntimeException();//抛异常出去,rocketmq会重新投递}}}

创建订单、发送延迟20分钟消息:

com.tuling.tulingmall.service.impl.SecKillOrderServiceImpl#asyncCreateOrder>com.tuling.tulingmall.component.rocketmq.OrderMessageSender#sendTimeOutOrderMessage
/** 如果订单创建成功,需要发送定时消息,20min后如果没有支付,则取消当前订单,释放库存*/
try {boolean sendStatus = orderMessageSender.sendTimeOutOrderMessage(order.getId() + ":" + flashPromotionRelationId + ":" + orderItem.getProductId());if (!sendStatus) {throw new RuntimeException("订单超时取消消息发送失败!");}
} catch (Exception e) {throw new RuntimeException("订单超时取消消息发送失败!");
}

预减库存preDecrRedisStock方法:
通过redis的decr函数扣减库存。
如果没有库存了,stock小于0时 发消息给rocketmq同步库存 redis设置为0
redis与db同步订单:
com.tuling.tulingmall.component.rocketmq.OrderMessageSender#sendStockSyncMessage

/*** 发送延时同步库存消息,60s后同步库存* @param productId* @param promotionId* @return*/public boolean sendStockSyncMessage(Long productId,Long promotionId){Message message = MessageBuilder.withPayload(productId+":"+promotionId).build();SendResult result = rocketMQTemplate.syncSend("stock-sync",message,5000,5);return SendStatus.SEND_OK == result.getSendStatus();}

六 RocketMQ消息

消息零丢失
生产端:同步发送消息、重试机制、事务消息、状态
服务端:刷盘存储(持久化)、主从同步、 状态返回(持久化、主从同步等成功后才返回状态)
消费端:pull broker offset(队列里有offset,偏移量) 消费端完全消费完消息后并且返回成功的情况,才会改变offset偏移值,如果消费失败则下次消费的还是之前那条数据;

RocketMQ消息不被重复消费
由于有重试机制,所以会导致消费重复的问题、也就是幂等性问题。
使用redis incr 自增机制来解决:
假如订单id,orderid为20250101 ,那就把20250101 作为key,处理完后自增为1,即此时value为1;如果下次这个订单又进来了(重复消费),去查value为1,说明已经处理过了,本次就不处理了;

或者,数据库唯一主键也能解决;
数据同步Canal
例如MQ与mysql如何做到数据同步(一般以mysql数据为主);
场景模拟:在秒杀后台把价格修改之后,如何同步到缓存中,比如redis如何同步mysql数据;
Canal是阿里的一款开源产品,canal安装与使用

Canal不适合集成秒杀库存,因为Canal不适合做更新频繁的业务;

项目中对product和秒杀表修改做同步操作:
com.tuling.tulingmall.mq.RefreshCacheListener#onMessage


import com.alibaba.otter.canal.protocol.FlatMessage;
import com.tuling.tulingmall.common.constant.RedisKeyPrefixConst;
import com.tuling.tulingmall.domain.PmsProductParam;
import com.tuling.tulingmall.util.ClassUtil;
import com.tuling.tulingmall.util.RedisOpsUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @description: 产品表信息修改-同步更新到Redis-Cache,ELK,Hadoop,相关下游服务**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.canal.topic}",consumerGroup = "${rocketmq.canal.group}")
public class RefreshCacheListener implements RocketMQListener<FlatMessage> {@Autowiredprivate RedisOpsUtil redisOpsUtil;private final static String PRODUCT = "pms_product";private final static String SKU = "pms_sku_stock";/*** 异步下单消费消息(由于集成了Canal,所以当数据库数据发生变化时,这里就会收到消息)* @param flatMessage*/@Overridepublic void onMessage(FlatMessage flatMessage) {log.info("database:{},event-type:{},old-row-data:{},new-row-data:{}",flatMessage.getDatabase(),flatMessage.getType(),flatMessage.getOld(),flatMessage.getData());//修改后的新记录List<Map<String, String>> records = flatMessage.getData();//修改前的数据List<Map<String, String>> old = flatMessage.getOld();switch (flatMessage.getType().toUpperCase()){case "UPDATE":updateCache(records,old,flatMessage.getTable());break;case "DELETE":records.stream().forEach((item)->{//删除缓存redisOpsUtil.delete(RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + item.get("id"));});break;}}public void updateCache(List<Map<String, String>> records,List<Map<String, String>> old,String table){int index = 0;/** 被更改的Row所有更改的行*/for(Map<String,String> row : old){Map<String, String> currentRow = records.get(index);String redisKey = RedisKeyPrefixConst.PRODUCT_DETAIL_CACHE + getProductId(currentRow,table);PmsProductParam product = redisOpsUtil.get(redisKey,PmsProductParam.class);if(!ObjectUtils.isEmpty(product)){Iterator<Map.Entry<String, String>> iterator = row.entrySet().iterator();while(iterator.hasNext()){Map.Entry entry = iterator.next();String key = (String) entry.getKey();//刷新产品数据product = refresh(product,table,key,currentRow);}/** 更新缓存内容,并设置过期时间*/long expired = redisOpsUtil.getExpired(redisKey, TimeUnit.SECONDS);redisOpsUtil.set(redisKey,product,expired,TimeUnit.SECONDS);}++index;}}/*** 更新缓存数据* @param product* @param table* @param key* @param currentRow* @return*/private PmsProductParam refresh(PmsProductParam product,String table,String key,Map<String, String> currentRow){if(PRODUCT.equals(table)){ClassUtil.callSetterMethod(product,ClassUtil.getSetterMethodName(key),currentRow.get(key));}else if(SKU.equals(table)){product.getSkuStockList().stream().forEach((item)->{if(item.getId() == Long.parseLong(currentRow.get("id"))){ClassUtil.callSetterMethod(item,ClassUtil.getSetterMethodName(key),currentRow.get(key));}});}return product;}/** 获取产品ID*/private String getProductId(Map<String, String> row,String table){if(PRODUCT.equals(table)){return row.get("id");}else{return row.get("product_id");}}}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/543678.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

hcia复习总结7

1&#xff0c;AR2发送2.0网段的信息给AR1&#xff0c;如果&#xff0c;AR1本身并不存在该网段的路由 信息&#xff0c;则将直接 刷新 到本地的路由表中。 Destination/Mask Proto Pre Cost Flags NextHop Interface 2.2.2.0/24 RIP 100…

有什么好赚钱的副业可以做吗?盘点6个互联网搞钱项目

网上互联网赚钱的项目千千万&#xff0c;每个博主都说自己赚到钱了。很多人既羡慕又慌张&#xff0c;特别幻想自己也月入几万&#xff0c;又怕错过赚钱的机会。那么今天就先为大家简单盘点6个互联网搞钱项目&#xff0c;帮助大家了解一下&#xff0c;也避避坑。这些项目收入可能…

关于Camera出图,有竖线问题的排查步骤

1、问题背景 之前调试的一个项目&#xff0c;在生产过程中&#xff0c;工厂反馈有台设备出图有明显的规则竖条纹&#xff0c;现象如下附件图所示&#xff1a; 遇到此类图像异常的问题该如何去分析呢&#xff0c;这是本文要总结的内容。 2、问题分析 1&#xff09;首先要从客户…

【yocto2】利用yocto工具构建嵌入式Linux系统

1.定制化嵌入式linux系统 在实际项目中&#xff0c;一款嵌入式产品往往具有不同的硬件平台和软件需求&#xff0c;因此需要对嵌入式Linux系统进行定制&#xff0c;以满足不同的产品需求。之前的章节中基于Freescale官方提供的例程&#xff0c;构建了运行于imx6ull14x14evk硬件…

18.古今成大事者,必以多选替身为第一要义——代理模式详解

“杏市而外&#xff0c;尚有何人可以分统?亦须早早提拔。办大事者以多多选替手为第一义&#xff0c;满意之选不可得&#xff0c;姑节取其次&#xff0c;以待徐徐教育可也。 ——曾国藩同治元年四月十二日” 一言 代理模式核心思想是为对象提供一个替身&#xff0c;以控制对这…

029—pandas 遍历行非向量化修改数据

前言 在 pandas 中&#xff0c;向量化计算是指利用 pandas 对象的内置方法和函数&#xff0c;将操作应用到整个数据结构的每个元素&#xff0c;从而在单个操作中完成大量的计算。 但在一些需求中&#xff0c;我们无法使用向量化计算&#xff0c;就需要迭代操作&#xff0c;本例…

汽车电子零部件(4):行泊一体ADAS

前言: 现阶段智能汽车行业正在大规模力推无限接近于L3的L2++或L2.9自动驾驶量产落地,类似于当初智能手机替换传统手机的行业机会期。智能汽车常见的智能驾驶功能包括: 行车场景:自适应巡航控制ACC;自动变道辅助ALC;交通拥堵辅助TJA;车道居中LCC;领航辅助NOA; 泊车场…

kkview远程控制: 内网远程桌面控制软件

内网远程桌面控制软件&#xff1a;高效、安全的远程管理方案 在信息技术日新月异的今天&#xff0c;内网远程桌面控制软件已成为许多企业和个人用户不可或缺的工具。这类软件允许用户通过内部网络&#xff0c;实现对其他计算机的远程访问和控制&#xff0c;从而大大提高工作效…

出现 Duplicate keys detected: ‘0‘. This may cause an update error 解决方法

目录 1. 问题所示2. 原理分析3. 解决方法1. 问题所示 前端测试的时候,在浏览器的控制台输出如下: [Vue warn]: Duplicate keys detected: 0. This may cause an update error.found in---> <Root>截图如下: 2. 原理分析</

双指针 | 移动零 | 复写零

1.移动零 题目描述&#xff1a; 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 示例&#xff1a; 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0]解题思路&#xff1a; right指针一直往后移动&#xff0c;当…

SpringBoot-邮件任务

很多时候的网站都有邮件发送功能&#xff0c;下面我们来看看邮件发送功能结合springboot该怎么实现下面的例子我是用的qq邮箱来完成的 1.导入依赖 我的springboot的版本是2.x.x的&#xff0c;如果发现运行不成功&#xff0c;请将版本降低到2.x.x <!--邮件任务--><depe…

HarmonyOS NEXT应用开发—折叠屏音乐播放器方案

介绍 本示例介绍使用ArkUI中的容器组件FolderStack在折叠屏设备中实现音乐播放器场景。 效果图预览 使用说明 播放器预加载了歌曲&#xff0c;支持播放、暂停、重新播放&#xff0c;在折叠屏上&#xff0c;支持横屏悬停态下的组件自适应动态变更。 实现思路 采用MVVM模式进…