黑马点评项目笔记
一:数据交互:
1.把String解析成Java对象集合并且存入Redis及Java对象集合转换成JSON。
@Overridepublic Result queryTypeList() {String s = stringRedisTemplate.opsForValue().get("cache:list:");System.out.println("s = " + s);if(!StrUtil.isBlank(s)){List<ShopType> cachedList = JSONUtil.toList(s, ShopType.class);return Result.ok(cachedList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);stringRedisTemplate.opsForValue().set("cache:list:", JSONUtils.toJSONString(list));return Result.ok(list);}
2.List存储:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = new ArrayList<>();for (String jsonString : stringlist) {ShopType shopType = JSONUtil.toBean(jsonString, ShopType.class);shopTypeList.add(shopType);}return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
当然,亦可以用Lambda表达式进一步简化:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = stringlist.stream() .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class)) .collect(Collectors.toList()); return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
注意,前端一般要的是Json数组,在这里,我们只需要一个对象集合就行,Springboot会默认JSON格式返回。
对于下面的代码:
List<ShopType> shopTypeList = stringlist.stream() .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class)) .collect(Collectors.toList()); return Result.ok(shopTypeList)
- 声明并初始化
shopTypeList
:
List<ShopType> shopTypeList = ...;
这行代码声明了一个ShopType
对象的列表shopTypeList
。它会在后续代码中通过Stream API操作进行初始化。
- 使用Stream API处理
stringlist
:
stringlist.stream()
这行代码将stringlist
(一个包含JSON字符串的列表)转换为一个Stream,以便进行流式处理。
3.映射每个JSON字符串到 ShopType
对象:
.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
使用map
操作,将Stream中的每个JSON字符串元素映射为对应的ShopType
对象。这里JSONUtil.toBean
是一个可以将JSON字符串转换为指定类型对象的方法。
4. 收集结果到新的列表:
.collect(Collectors.toList());
使用collect
操作,将Stream中的元素收集到一个新的列表中。这里使用了Collectors.toList()
作为收集器,它会创建一个新的列表来存储转换后的ShopType
对象。
二:缓存策略:
2.1.更新:
一般我们选用第一种。
对于三,两种都存在问题:
但是,第二种发生问题的概率极低。所以我们用第二种:
@Override
@Transactional
public Result updateshop(Shop shop) {Long id = shop.getId();if (id == null){return Result.fail("店铺id不能为空");}updateById(shop);stringRedisTemplate.delete("cache:shop:" + id);return Result.ok();
}
2.2.穿透:
我们这里用第一种方法:
if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return Result.ok(shop);
}
//注意,null不等于""
if(s != null){return Result.fail("店铺不存在。");
}//......if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return Result.fail("店铺不存在。");}
2.3.雪崩:
2.4.击穿:
对比:
注意,逻辑过期是假的过期,但是互斥锁是真的没有该数据的缓存了。
互斥锁:
我们用setnx来实现该锁:
private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}
防止大量缓存穿透及锁的使用代码:
public Shop queryWithLock(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return shop;}if(s != null){return null;}//缓存重建String lockKey = "lock:shop:"+id;Shop shop = null;try {if(!tryLock(lockKey)){Thread.sleep(50);return queryWithLock(id);}shop = getById(id);log.info("shop = " +shop);if(shop == null){//写空值stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = "cache:shop:" + id;stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {unLock(lockKey);}return shop;}
注意,try-catch-finally是保障锁必须被释放的关键。方法调用如下:
@Overridepublic Result queryById(Long id) {//缓存穿透//Shop shop =queryWithPassTrough(id);//互斥锁Shop shop = queryWithLock(id);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}
逻辑过期:
首先是怎么实现设置逻辑过期:
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
然后我们接下来模拟一下将热点事件提前写入redis:
public void saveShopRedis(Long id,Long expires){Shop shop = getById(id);RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expires));stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}
先给出独立线程代码:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
- ExecutorService: 这是Java的并发包
java.util.concurrent
中的一个接口,它代表了一个用于执行任务的线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。 - CACHE_REBUILD_EXECUTOR: 这是我们创建的ExecutorService对象的变量名。根据命名,它可能用于重建或刷新某种缓存。
- Executors.newFixedThreadPool(10): 这是创建线程池的方法。
Executors
是Java中的一个工具类,它提供了创建线程池的静态方法。newFixedThreadPool(10)
创建了一个固定大小的线程池,其中包含10个线程。这意味着无论提交多少任务,线程池中的线程数量都不会超过10个。当提交的任务数超过线程数时,任务会在队列中等待,直到线程空闲并可以处理它们。
简而言之,这行代码定义了一个私有的、静态的、不可变的ExecutorService
对象,该对象是一个包含10个线程的固定线程池,用于处理与缓存重建相关的任务。
接下来就是业务代码:
public Shop queryWithExpire(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();Shop shop = JSONUtil.toBean(object, Shop.class);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return shop;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {saveShopRedis(id,20L);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return shop;}
记得在finally中释放锁。
2.5.自定义工具类:
@Slf4j
@Component
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time , TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);}private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}public void setWithLogicalExpire (String key, Object value, Long time , TimeUnit unit) {RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}//穿透代码public <R,ID> R queryWithPassTrough (String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit) {String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isNotBlank(s)){return JSONUtil.toBean(s,type);}if(s != null){return null;}R r = dbFallback.apply(id);if(r == null){//写空值stringRedisTemplate.opsForValue().set(keyPrefix + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = keyPrefix + id;this.set(key,r,time,unit);return r;}public <R,ID>R queryWithExpire (String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();R r = JSONUtil.toBean(object, type);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return r;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {R apply = dbFallback.apply(id);this.setWithLogicalExpire(keyPrefix + id,apply,time,unit);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return r;}
}
补充:
这里的StringRedisTemplate没有调用set方法,也没有注解,但是依然实现了依赖注入,这是因为在spring中,如果注入对象,被注入对象都加入IOC容器,并且有一个单一的构造函数,那么就可以实现不需要注解与配置的依赖注入。
使用举例:
@Overridepublic Result queryById(Long id) {Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}
注意,如果是缓存击穿,一定要数据预热。
三:秒杀业务:
在这种业务中,我们用全局ID生成器来生成ID:
我们用redis来处理,可以满足要求。无论在哪里操作数据库,都是redis。
常见全局唯一ID策略:
注意,这里的数据库自增并不是数据库单纯的自增,因为我们知道,在后面的学习中,是分布式的,到时候,不同的线程可能会导致乱序,重序,这里是单独的那另外一张表做自增,其他的线程都从这里取id。
下面我们来看看基于redis自增的策略:
public class RedisIDWorker {public static final long BEGIN_TIMESTAMP = 1640995200L;private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIDWorker (StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId (String keyPrefix) {LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);return timestamp << COUNT_BITS | count;}
}
increment(String key): 这个方法是对指定的 key 进行自增操作。如果该 key 不存在,那么它的初始值会被设为 0,然后进行自增,结果为 1。如果 key 存在且其值可以转换为整数,则该值会进行自增。
3.2.超卖问题:
但是,这里的票的变化是与版本同步的,所以我们没有必要用版本号,直接用票作为乐观锁。
CAS是指"比较和交换"(Compare and Swap)的缩写,是一种原子操作,通常用于多线程编程中实现同步。CAS操作包括三个参数:需要进行操作的内存位置、旧的预期值和新的值。CAS操作会比较内存位置的当前值与旧的预期值,如果相同则将内存位置的值更新为新的值,否则不做任何操作。CAS操作是一种乐观锁的实现方式,可以避免使用传统的锁机制带来的性能开销和死锁问题。CAS操作在Java中的实现包括AtomicInteger、AtomicLong等类。
@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).eq("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}
}
注意,乐观锁也有弊端,他在多个线程同时买票时只能允许一个线程成功,其他线程都会失败,所以我们要解决这个问题:
boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",0).update();
.eq("voucher_id", voucherId)
:- 这是一个条件语句,表示只更新那些
voucher_id
字段值等于voucherId
的记录。voucherId
应该是一个变量,存储了要更新的优惠券的ID。
- 这是一个条件语句,表示只更新那些
.gt("stock",0)
:- 这是另一个条件语句,表示只更新那些
stock
字段值大于0的记录。gt
是“greater than”的缩写,所以这个条件确保不会更新那些库存已经为0的记录。
- 这是另一个条件语句,表示只更新那些
.update();
:- 最后,调用
update
方法执行实际的更新操作。
- 最后,调用
注意,这里事实上是利用了数据库自带的行锁,当某个事务尝试更新某一行记录时,数据库会为该行加上行锁,阻止其他事务同时修改该行,直到当前事务完成并释放锁。
3.3.一人一单:
Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id",userid).eq("voucher_id",voucherId).count();if(count > 0){return Result.fail("您已经购买过一次!");}
但是依然有问题,用户多线程操作时,依然会存在都判断通过的问题,所以我们先把代码分成几个方法,然后加锁:
@Transactionalpublic Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}}
这里有两个细节:
1.userid.toString().intern()
,当处理一个用户的多个请求时,如果只用toString()
,该方法在Java中是new一个新的对象,那么即使是同一个用户,可能他的多次请求的锁都不一样,这是有问题的,但是intern()
是在字符串常量池中判断有没有这个这个,如果有,就用这个对象。
2.不能在方法上加锁:因为所有的线程都走一个方法,可能会导致其他问题。
但是这样还是有问题:
因为这里又有事务又有锁,锁在事务的内部,如果锁先释放,事务还没有提交,然后其他线程去查询,有可能查询不到,就会存在问题:
所以我们把锁加在外面:
@Transactional
public Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);
}
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {return createVoucher(voucherId);}
但是这样还是有问题:
在外部同步代码块中直接调用带有@Transactional注解的方法可能会导致事务管理不生效的原因,主要是因为Spring的AOP(面向切面编程)代理机制。@Transactional注解的事务管理是通过Spring的AOP代理来实现的,具体来说:
代理模式:**Spring使用代理(通常是JDK动态代理或者CGLIB代理)来拦截那些标有@Transactional注解的方法调用。**当一个被代理的对象调用这些方法时,Spring会在调用前后插入事务管理的逻辑,比如开始事务、提交或回滚事务。
自我调用问题:在同一个类内部,如果一个方法直接调用另一个带有@Transactional注解的方法,这种调用是“直接调用”,而非通过代理对象进行。因此,Spring的AOP代理无法拦截到这种内部调用,从而不会触发事务管理的行为。
在代码示例中,虽然createVoucher方法上有@Transactional注解,但如果它是直接在类内部通过同步代码块被调用,那么这个直接调用就绕过了Spring的AOP代理,事务管理因此可能不会生效。
为了解决这个问题,确保事务管理能够正常工作,可以考虑以下几种方案:
1.通过代理对象调用:如果必须在类内部调用事务方法,可以通过注入当前类的bean实例来间接调用,这样Spring的AOP代理就能生效。重构方法结构:将事务逻辑和同步逻辑整合到一个方法中,确保事务和同步都在同一个由Spring管理的方法体内执行。
2.使用AspectJ编织:Spring也支持使用AspectJ来实现更细粒度的切面编织,包括对类内部方法调用的拦截,但这需要额外的配置和可能的编译期织入。
综上所述,直接在同步代码块中调用事务方法的问题在于它规避了Spring AOP代理,从而可能忽略了事务管理的预期效果。
修改如下:
Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}
AopContext.currentProxy(): 这是Spring提供的一个方法,用于在AOP代理方法内部获取当前代理对象。这一步很关键,因为它允许在类内部调用时仍然通过代理来执行,从而确保了@Transactional注解的事务管理能够生效。通过代理调用createVoucher: 通过强制类型转换得到的代理对象proxy来调用createVoucher方法,而不是直接调用。这样做使得事务管理逻辑能够被Spring的AOP代理捕获并正确执行,包括事务的开始、提交或回滚。
导入依赖:
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
暴露代理对象,在启动类上面加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)
集群的并发问题:
每个jvm内部共享一个synchronized锁监视器。如果在同一个tomcat,那肯定是没有问题的,但是如果是多个服务器,就有多个tomcat,JVM,就会出现问题。
3.4.分布式锁:
所以我们使用Redis。
为了避免释放错误,所以我们加上过期时间。注意,设置过期时间与setnx是两个语句,我们必须确保它们具有原子性。所以我们可以用set语句:
set lock k1 EX 10 NX
我们用非阻塞式实现这个操作。
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX+name);}@Overridepublic boolean tryLock(long timeoutSec) {long id = Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
}
使用方法:
Long userid = UserHolder.getUser().getId();SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);boolean isLock = lock.tryLock(1200);if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unLock();}
可能存在的问题:
所以,要判断这个锁是不是自己的,不要把别人的锁释放了。
注意,不要用本身的线程Id作为判断标准,因为不同的JVM有不同的线程Id,代码改进如下:
private static final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//......其他代码@Overridepublic void unLock() {String id = ID_PREFIX + Thread.currentThread().getId();String lock = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);if(id.equals(lock)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}@Overridepublic boolean tryLock(long timeoutSec) {String id = ID_PREFIX + Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
但是事实上,还是有问题的(md,怎么这么多问题):
下面是一个极端情况:
如果线程一判断相同通过了,但是删除锁的时候被阻塞了,还是会删除其他线程的锁。但是为什么这样的情况我们可以删掉呢:
注意这里存入redis的key,是前缀加用户ID,而前缀,后缀都是static修饰的,在同一个服务器内前缀后缀是一样的,而我们这里解决的是一人一单,也就是说,用户的ID也是一样的,因此在同一台服务器下的同一个用户的key就是固定的。其次,我们上面将后缀加上线程设置为key值,这是为了应对多台服务器下的问题,而我们用的redis,也是为了解决多台服务器下的并发问题,因此在这里,反而在同一台服务器下,在判断通过后删除之前的间隙中,删除动作迟迟不执行,然后老线程的key过期了,然后另外一个线程创建了一个相同的key,此时,老线程执行删除动作,就会删除掉新线程的key。注意,删除是按照key的!!!
所以我们引入Lua脚本:
新建一个unlock.lua文件:
if(redis.call('get',KEYS[1]) == ARGV[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
然后使用静态代码块加载脚本文件:
private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}
使用脚本:
@Overridepublic void unLock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());}
3.5.分布式锁优化——Redisson:
因为自己写实在是太烦琐了,所以我们引入了其他第三方服务:
导入依赖:
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
配置客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.6.128").setPassword("kz32330981");return Redisson.create(config);}
}
使用分布式锁:
举例:
@Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}Long userid = UserHolder.getUser().getId();//SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userid);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unlock();}}
这里boolean isLock = lock.tryLock();
没有设置参数,默认失败一次就马上返回。
3.6.Redissson可重入锁原理:
为什么一般情况下不能实现可重入锁:
string只能有一个标识,当我们实现可重入锁时,如果是当前的线程,我们把它的value++,释放锁的时候–,所以我们需要两个标识,所以我们使用hash:
Redisson内部也是这么实现的。
3.6.Redissson可重试与避免超时释放原理:
注意,这里为null说明获取锁成功了。为null说明没有存在这个锁,所以当然会成功了。如果失败,返回的是当前存在的锁的有效期。
重试机制:
在Redisson中,可重试机制通常与分布式锁的实现紧密相关。当客户端尝试获取分布式锁但失败时(如锁已被其他客户端持有),Redisson会利用Redis的发布/订阅机制来实现锁的可重试。下面我将简要解释Redisson是如何结合消息订阅来实现可重试机制的:
- 锁获取失败与等待
- 当客户端尝试获取分布式锁但失败时(比如调用
lock.tryLock(waitTime, leaseTime, TimeUnit.unit)
方法),Redisson会记录当前线程并启动一个定时任务或监听器来等待锁的释放。
- 当客户端尝试获取分布式锁但失败时(比如调用
- 消息订阅
- 为了实现等待锁的释放,Redisson会利用Redis的发布/订阅机制。具体来说,Redisson会为每个锁维护一个特定的频道(channel)或主题(topic)。
- 当客户端尝试获取锁但失败时,它会订阅该锁的频道。这样,一旦锁的持有者释放了锁,Redisson就会在该频道上发布一个消息。
- 监听器与重试
- 客户端在订阅了锁的频道后,会设置一个监听器来监听该频道上的消息。
- 当锁的持有者释放锁并在频道上发布消息时,监听器会立即收到通知。
- 监听器在收到通知后,会触发一个重试机制,即再次尝试获取锁。
避免超时释放:
-
Redis中的看门狗策略通常与Redisson库一起使用,用于自动检测并处理过期键的机制。
-
应用程序使用Redisson库监视Redis服务器上的指定键。当这些键被修改时,看门狗策略会自动触发相应的操作,如更新本地缓存或重新计算依赖项。
-
这有助于应用程序实时响应Redis中的数据变化,并防止在数据发生变化时出现问题,如死锁和其他并发问题。
-
具体来说,
leaseTime
是锁的有效期,也就是锁的持有时间。当调用lock(leaseTime, unit)
方法并传入一个非-1的leaseTime
时,这个特定的锁将会在一个固定的时间后自动释放,即使锁的持有者仍然在运行。在这种情况下,Redisson不会启动看门狗机制,因为锁的过期时间是由应用程序明确指定的。然而,如果调用lock()
方法(没有传入leaseTime
参数),或者传入-1
作为leaseTime
的值,那么锁将没有固定的过期时间。此时,Redisson会启动看门狗机制。看门狗会定期检查锁的持有者是否仍然活跃,并在锁即将过期时自动延长锁的有效期,从而确保锁不会被意外释放,直到锁的持有者显式地释放锁或者持有者不再活跃。
注意,只用我们没有设置时间(或者-1)才会触发这个策略。
3.7.主从一致性:
为什么以前会出现问题:
当主节点出现问题时,从从节点获取锁就会成功。
而Redisson是怎么解决这个问题的呢:
以下是如何使用Redisson的MultiLock
的基本步骤:
-
配置Redisson客户端:
首先,你需要配置Redisson客户端以连接到Redis集群或哨兵模式,如之前所述。 -
创建RLock对象:
对于每个Redis节点,你需要创建一个RLock
对象。这些RLock
对象将用于组成MultiLock
。RLock lock1 = redisson1.getLock("lock1"); // 假设redisson1连接到第一个Redis节点 RLock lock2 = redisson2.getLock("lock2"); // 假设redisson2连接到第二个Redis节点 // ... 为其他节点创建RLock对象 ...
注意:虽然锁的名称(如
"lock1"
和"lock2"
)可以不同,但出于一致性和可维护性的考虑,最好使用相同的锁名称。 -
组合RLock对象为MultiLock:
使用这些RLock
对象创建一个MultiLock
。List<RLock> locks = Arrays.asList(lock1, lock2, /* ... 其他locks ... */); RLock multiLock = redisson.getMultiLock(locks);
-
使用MultiLock:
现在你可以像使用普通的RLock
一样使用MultiLock
了。multiLock.lock(); try { // 临界区代码,只有获得锁的线程才能执行 } finally { multiLock.unlock(); }
注意:在
finally
块中解锁是非常重要的,以确保锁总是被释放,即使发生异常。 -
关闭Redisson客户端:
完成所有操作后,确保关闭所有Redisson客户端实例以释放资源。redisson1.shutdown(); redisson2.shutdown(); // ... 关闭其他Redisson客户端 ...
-
测试:
为了测试MultiLock
在多节点环境中的行为,你可以模拟节点故障、网络分区等情况,并观察MultiLock
是否仍然能够正确工作。
总结:
3.7.异步秒杀:
原来的串行执行:
优化如下:
完整的流程如下:
Part 1:保存到redis:增加redis中的优惠券数量:
@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());}
Part 2:编写Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
- 参数定义:
ARGV[1]
:voucherId
,表示优惠券ID。ARGV[2]
:userId
,表示用户的ID。
- 定义键名:
stockKey
:这是优惠券库存的键名,格式为'seckill:stock:'
加上voucherId
。orderKey
:这是已购买(或已下单)该优惠券的用户的集合的键名,格式为'seckill:order:'
加上voucherId
。
- 检查库存:
- 使用
redis.call('get', stockKey)
获取库存数量。 - 使用
tonumber
函数将返回的字符串转换为数字。 - 如果库存数量小于或等于0,则返回
1
,表示库存不足。
- 使用
- 检查用户是否已经购买:
- 使用
redis.call('sismember', orderKey, userId)
检查用户是否已经在orderKey
集合中。 - 如果用户已经购买(即用户ID在集合中),则返回
2
,表示用户已购买。
- 使用
- 扣减库存并记录订单:
- 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
- 使用
redis.call('incrby', stockKey, -1)
扣减库存。 - 使用
redis.call('sadd', orderKey, userId)
将用户ID添加到已购买集合中。
- 使用
- 这两个操作是原子性的,因为它们都在同一个Lua脚本中执行。
- 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
- 返回值:
- 如果成功扣减库存并记录订单,则返回
0
,表示操作成功。 - 如果库存不足,返回
1
。 - 如果用户已购买,返回
2
。
- 如果成功扣减库存并记录订单,则返回
Part 3:基于阻塞队列的秒杀业务:
private BlockingQueue<VoucherOrder >orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Resource
private RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()));int res = resultT.intValue();if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");}long id = redisIDWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);voucherOrder.setId(id);voucherOrder.setUserId(UserHolder.getUser().getId());//在父线程就先获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();orderTasks.add(voucherOrder);return Result.ok(id);}
Part 4:异步下单业务:
private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();private IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}
}
-
@PostConstruct注解:
这个注解标记在init()方法上,表示该方法会在类的实例创建并注入到Spring容器后立即调用。这里是用来启动任务处理的逻辑。
-
init()方法:
seckill_order_executor是一个线程池,可能是ExecutorService类型的实例。
方法中提交了一个新的VoucherOrderHandler实例到线程池执行。这意味着每次初始化VoucherOrderHandler,都会启动一个新的线程来处理订单任务,形成一个循环执行的任务队列。 -
run()方法:
作为Runnable接口的实现,run()方法包含实际的业务逻辑。使用一个无限循环while (true),确保线程将持续运行,直到程序停止或线程被显式中断。
-
try-catch块:
在循环中,从orderTasks队列(可能是BlockingQueue类型)中获取一个VoucherOrder对象并调用handleVoucherOrder(voucherOrder)进行处理。如果在处理过程中出现任何异常,捕获并记录错误日志,但不会中断循环。这样可以确保即使处理过程中出现问题,线程仍能继续尝试处理下一个订单。
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
注意这里的两处细节,userId与createVoucher都与父线程有关系,而在这里是异步的子线程,所以代理对象要在父线程就创建好,而userid也不能通过线程本地直接获取。这里用Redisson重复验证是为了让线程更加安全。
@Transactionalpublic void createVoucher (VoucherOrder voucherOrder){Long userid = voucherOrder.getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("用户已经购买过一次!");return ;}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if(!success){log.error("库存不足");return;}save(voucherOrder);}
重复验证道理同上。
3.8.阻塞队列优化——消息队列:
基于上面的阻塞队列存在两个问题:1.内存问题 2.服务宕机的安全问题,所以我们必须进行优化。
1.独立于JVM,解决内存问题。
2.内部做了数据的持久化,避免安全问题。
(1):基于List
(2):PubSub:
如果没有被任何频道订阅,会直接丢失。
(3):Stream:
发送消息:
读取消息:
3.9.基于Stream的消费者组:
基于Java代码:
总结:
3.10.代码改造:
(1):创建消息队列:
XGROUP CREATE stream.orders g1 0 MKSTREAM
这是创建消费者组,自动创建了队列。
(2):修改Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
redis.call('xadd','strem.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
这行代码是使用 Redis 的 XADD 命令向名为 “stream.orders” 的 Stream 数据结构中添加一条新的消息。该消息包含了三个字段:“userId”、“voucherId” 和 “id”,分别对应传入的参数 userId、voucherId 和 orderId 的值。这样可以将订单相关的信息存储在 Redis 中,并且可以通过 Stream 数据结构的功能对这些信息进行处理和查询。这种方式可以方便地实现消息队列、事件日志等功能。
(3):修改发消息业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
long id = redisIDWorker.nextId("order");
Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()),String.valueOf(id)
);
int res = resultT.intValue();
if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");
}proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(id);
}
(4):收消息:
private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}}
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
我们分成三个部分来看:
4.1.正常接受处理消息:
String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}
- 参数详解:
- Consumer.from(“g1”, “c1”)
- 这表示从消费者组 “g1” 中的消费者 “c1” 读取数据。在 RedisStreams 中,消费者组用于允许多个消费者并发地读取同一个 Stream。注意,这里因为没有c1,所以先创建了c1。
- StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
StreamReadOptions.empty()
创建一个空的读取选项对象。.count(1)
设置读取的条目数量为 1。这意味着,无论 Stream 中有多少未读的数据,此方法仅返回一个条目。.block(Duration.ofSeconds(2))
设置读取操作的阻塞时间。如果 Stream 中没有新的数据可读,那么这个方法会阻塞最多 2 秒,然后返回一个空列表。
- StreamOffset.create(queueName, ReadOffset.lastConsumed())
- 这定义了从哪个位置开始读取 Stream。
queueName
是 Stream 的名称。ReadOffset.lastConsumed()
表示从消费者 “c1” 最后消费的位置开始读取。如果消费者 “c1” 还没有消费过任何数据,那么这个位置可能是 Stream 的开始位置或任何其他预定义的位置(例如 Stream 的起始点)。
- Consumer.from(“g1”, “c1”)
等价于:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS mystream >
4.2.处理因异常而位于pending-list的消息:
private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}
与上面同理,只是这里是获取未确认的消息,这里的异常不做递归处理,因为外面本身就是true循环。
4.3.新增:
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
四:其他业务:
4.1.不存在该表中字段处理方法如下,后续需要自己维护:
@TableField(exist = false)
private String icon;
/*** 用户姓名*/
@TableField(exist = false)
private String name;
4.2.点赞排序:
改造前:
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//......
stringRedisTemplate.opsForSet().add(key,userId.toString());
//......
stringRedisTemplate.opsForSet().remove(key,userId.toString());
改造后:
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//...
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
//......
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
部分代码如下:
@Overridepublic Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());List <UserDTO> users = userService.listByIds(ids).stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);}
注意,这里有个问题,取出的结果是有序的,都是我们在进行userService.listByIds(ids)
时,数据库内部用的是WHERE id IN(1,5)
,而它在数据库内部的索引中,就优化了,导致无论怎么样,都是先1再5。可以用下面的sql语句优化:
在MP中,就可以这么写:
@Override
public Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);List <UserDTO> users = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list().stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);
}
4.3.共同关注:
基于Set集合的交集:
@Overridepublic Result followCommons(Long id) {Long userId = UserHolder.getUser().getId();Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follows:" + userId, "follows:" + id);if(intersect==null||intersect.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> collect = intersect.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOS = userService.listByIds(collect).stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);}
4.4.关注推送——Feed流
以下是基于推模式:
分页问题:
滚动查询:
代码实现如下:
@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {Long userId = UserHolder.getUser().getId();String key = "feed:" + userId;Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);if(typedTuples == null || typedTuples.isEmpty()){return Result.ok();}List<Long>ids = new ArrayList<>(typedTuples.size());long minTime = 0;int os = 1;for(ZSetOperations.TypedTuple <String> tuple : typedTuples){ids.add(Long.valueOf(tuple.getValue()));long time = tuple.getScore().longValue();if(time == minTime){os++;}else{minTime = time;os = 1;}}List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Blog blog : blogs) {queryBlogUser(blog);queryBlogLikes(blog);}ScoreResult r = new ScoreResult();r.setList(blogs);r.setMinTime(minTime);r.setOffset(os);return Result.ok(r);}
下面是关键代码解释:
Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);
参数
key
: 这是 Redis 中有序集合的键名。在这个例子中,它是通过"feed:" + userId
构建的,其中userId
是从UserHolder
获取的当前用户的 ID。0
: 分数范围查询的起始分数(包含)。在这个例子中,查询从分数 0 开始。max
: 分数范围查询的结束分数(不包含)。这是你要查询的最高分数(不包括这个分数)。offset
: 分页查询的偏移量。这表示从有序集合中跳过多少个元素后再开始返回结果。2
: 这是一个限制参数,表示最多返回 2 个元素。然而,在实际应用中,这个限制可能不是非常有用,因为它会限制返回结果的数量。通常,你可能会根据分页参数(如每页大小)来动态设置这个值。
在这个例子中,通过 reverseRangeByScoreWithScores(key, 0, max, offset, 2)
,你正在查询从分数 0 到 max
(不包含)之间的博客,按分数从高到低排序,并跳过 offset
个元素,最后最多返回 2 个结果。每个结果都是一个 TypedTuple
,它包含博客的 ID 和其发布时间(作为分数)。
注意,分布时间越前的,id越小,所以从零开始,而且这里的os,及偏移量,是上次发布时间最早的时间的重复个数,不是在所有结果中的个数,比如时间戳为4 4 4 3,每次两个,第一次查出4与4,4虽然有三个,但是这里记录的是上一次查询中重复的最小时间,及两个,下一次,就从0到最大时间4,并且跳过2个,即第三个4开始查询。
4.5.附近商铺:
我们可以先做数据预热,先在test中存入redis:
@Testpublic void test(){List<Shop> list = shopService.list();Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){Long typeId = entry.getKey();String key = "shop:geo:" + typeId;List<Shop> value = entry.getValue();for(Shop shop : value){stringRedisTemplate.opsForGeo().add(key,new org.springframework.data.geo.Point(shop.getX(),shop.getY()),shop.getId().toString());}}
}
或者可以这么写:
代码如下:
@Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {if (x == null || y == null){// 根据类型分页查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = (current + 1) * SystemConstants.DEFAULT_PAGE_SIZE;String key = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String> >search = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));if(search == null){return Result.ok();}List<GeoResult<RedisGeoCommands.GeoLocation<String>>>contents = search.getContent();if(contents.size()<=from){return Result.ok();}//截取List <Long> ids = new ArrayList<>(contents.size());Map<String,Distance>distanceMap = new HashMap<>(contents.size());contents.stream().skip(from).forEach(result->{String shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));Distance distance = result.getDistance();distanceMap.put(shopIdStr,distance);});List<Shop> shops = query().in("id",ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);}
4.6.用户签到:
@Overridepublic Result sign() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth() - 1;stringRedisTemplate.opsForValue().setBit(key, day, true);return Result.ok();}
String keySuffix = now.format(DateTimeFormatter.ofPattern(“:yyyyMM”));: 根据当前日期的年月(格式为"yyyyMM",例如"202302")创建一个字符串后缀,用于构建Redis键。
int day = now.getDayOfMonth() - 1;: 获取当前日期减1(因为数组或位数组通常从0开始计数,所以这里将日期减1来对应位数组的索引)。
stringRedisTemplate.opsForValue().setBit(key, day, true);: 使用Spring Data Redis的StringRedisTemplate操作位值。在Redis的键key对应的位数组中,设置第day位为true,表示用户在这一天签到了。
连续签到:
@Overridepublic Result signCount() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth();List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));if(result == null || result.isEmpty()){return Result.ok(0);}Long num = result.get(0);if(num == null || num == 0){return Result.ok(0);}int count = 0;while (true){if((num & 1) == 0){break;}else {count++;}num >>>= 1;}return Result.ok(count);}
关键代码如下:
List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));