Redis实战笔记

黑马点评项目笔记

一:数据交互:

1.把String解析成Java对象集合并且存入Redis及Java对象集合转换成JSON。

 @Overridepublic Result queryTypeList() {String s = stringRedisTemplate.opsForValue().get("cache:list:");System.out.println("s = " + s);if(!StrUtil.isBlank(s)){List<ShopType> cachedList = JSONUtil.toList(s, ShopType.class);return Result.ok(cachedList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);stringRedisTemplate.opsForValue().set("cache:list:", JSONUtils.toJSONString(list));return Result.ok(list);}

2.List存储:

List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = new ArrayList<>();for (String jsonString : stringlist) {ShopType shopType = JSONUtil.toBean(jsonString, ShopType.class);shopTypeList.add(shopType);}return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
当然,亦可以用Lambda表达式进一步简化:
List <String> stringlist= stringRedisTemplate.opsForList().range("cache:list",0,-1);System.out.println("stringlist = " + stringlist);if (stringlist != null && !stringlist.isEmpty()) {List<ShopType> shopTypeList = stringlist.stream()  .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  .collect(Collectors.toList());  return Result.ok(shopTypeList);}LambdaQueryWrapper <ShopType>lambdaQueryWrapper = new LambdaQueryWrapper<>();lambdaQueryWrapper.orderByAsc(ShopType::getSort);List <ShopType> list = shopTypeMapper.selectList(lambdaQueryWrapper);System.out.println("list = " + list);for (int i = 0; i < list.size(); i++) {stringRedisTemplate.opsForList().rightPush("cache:list", JSONUtil.toJsonStr(list.get(i)));System.out.println(list.get(i));}return Result.ok(list);}
注意,前端一般要的是Json数组,在这里,我们只需要一个对象集合就行,Springboot会默认JSON格式返回。
对于下面的代码:
List<ShopType> shopTypeList = stringlist.stream()  .map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))  .collect(Collectors.toList());  return Result.ok(shopTypeList)
  1. 声明并初始化 shopTypeList
List<ShopType> shopTypeList = ...;

这行代码声明了一个ShopType对象的列表shopTypeList。它会在后续代码中通过Stream API操作进行初始化。

  1. 使用Stream API处理 stringlist
stringlist.stream()

这行代码将stringlist(一个包含JSON字符串的列表)转换为一个Stream,以便进行流式处理。

3.映射每个JSON字符串到 ShopType 对象

.map(jsonString -> JSONUtil.toBean(jsonString, ShopType.class))
使用map操作,将Stream中的每个JSON字符串元素映射为对应的ShopType对象。这里JSONUtil.toBean是一个可以将JSON字符串转换为指定类型对象的方法。

​ 4. 收集结果到新的列表

.collect(Collectors.toList());

使用collect操作,将Stream中的元素收集到一个新的列表中。这里使用了Collectors.toList()作为收集器,它会创建一个新的列表来存储转换后的ShopType对象。

二:缓存策略:

2.1.更新:
image-20240422202913302
一般我们选用第一种。

image-20240422203251310

对于三,两种都存在问题:

image-20240422203736068

但是,第二种发生问题的概率极低。所以我们用第二种:
@Override
@Transactional
public Result updateshop(Shop shop) {Long id = shop.getId();if (id == null){return Result.fail("店铺id不能为空");}updateById(shop);stringRedisTemplate.delete("cache:shop:" + id);return Result.ok();
}

2.2.穿透:

image-20240422205639525
我们这里用第一种方法:
if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return Result.ok(shop);
}
//注意,null不等于""
if(s != null){return Result.fail("店铺不存在。");
}//......if(shop == null){
//写空值
stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return Result.fail("店铺不存在。");}

2.3.雪崩:

image-20240422211712419

2.4.击穿:

image-20240422212047122

对比:

image-20240422212634600

image-20240422212722940
注意,逻辑过期是假的过期,但是互斥锁是真的没有该数据的缓存了。

互斥锁:

image-20240422213224412
我们用setnx来实现该锁:
private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}
防止大量缓存穿透及锁的使用代码:
public Shop queryWithLock(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isNotBlank(s)){Shop shop = JSONUtil.toBean(s,Shop.class);return shop;}if(s != null){return null;}//缓存重建String lockKey = "lock:shop:"+id;Shop shop = null;try {if(!tryLock(lockKey)){Thread.sleep(50);return queryWithLock(id);}shop = getById(id);log.info("shop = " +shop);if(shop == null){//写空值stringRedisTemplate.opsForValue().set("cache:shop:" + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = "cache:shop:" + id;stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {unLock(lockKey);}return shop;}
注意,try-catch-finally是保障锁必须被释放的关键。方法调用如下:
@Overridepublic Result queryById(Long id) {//缓存穿透//Shop shop =queryWithPassTrough(id);//互斥锁Shop shop = queryWithLock(id);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}

逻辑过期:

image-20240423125308260

首先是怎么实现设置逻辑过期:
@Data
public class RedisData {private LocalDateTime expireTime;private Object data;
}
然后我们接下来模拟一下将热点事件提前写入redis:
 public void saveShopRedis(Long id,Long expires){Shop shop = getById(id);RedisData redisData = new RedisData();redisData.setData(shop);redisData.setExpireTime(LocalDateTime.now().plusSeconds(expires));stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));}
先给出独立线程代码:
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
  1. ExecutorService: 这是Java的并发包java.util.concurrent中的一个接口,它代表了一个用于执行任务的线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的ThreadFactory创建一个新线程。
  2. CACHE_REBUILD_EXECUTOR: 这是我们创建的ExecutorService对象的变量名。根据命名,它可能用于重建或刷新某种缓存。
  3. Executors.newFixedThreadPool(10): 这是创建线程池的方法。Executors是Java中的一个工具类,它提供了创建线程池的静态方法。newFixedThreadPool(10)创建了一个固定大小的线程池,其中包含10个线程。这意味着无论提交多少任务,线程池中的线程数量都不会超过10个。当提交的任务数超过线程数时,任务会在队列中等待,直到线程空闲并可以处理它们。

简而言之,这行代码定义了一个私有的、静态的、不可变的ExecutorService对象,该对象是一个包含10个线程的固定线程池,用于处理与缓存重建相关的任务。

接下来就是业务代码:
 public Shop queryWithExpire(Long id){String s = stringRedisTemplate.opsForValue().get("cache:shop:" + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();Shop shop = JSONUtil.toBean(object, Shop.class);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return shop;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {saveShopRedis(id,20L);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return shop;}
记得在finally中释放锁。

2.5.自定义工具类:

@Slf4j
@Component
public class CacheClient {private final StringRedisTemplate stringRedisTemplate;private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);public CacheClient(StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public void set(String key, Object value, Long time , TimeUnit unit) {stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);}private Boolean tryLock(String key){Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);return BooleanUtil.isTrue(result);}private void unLock(String key){stringRedisTemplate.delete(key);}public void setWithLogicalExpire (String key, Object value, Long time , TimeUnit unit) {RedisData redisData = new RedisData();redisData.setData(value);redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));}//穿透代码public <R,ID> R queryWithPassTrough (String keyPrefix, ID id, Class<R> type, Function<ID,R>dbFallback,Long time,TimeUnit unit) {String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isNotBlank(s)){return JSONUtil.toBean(s,type);}if(s != null){return null;}R r = dbFallback.apply(id);if(r == null){//写空值stringRedisTemplate.opsForValue().set(keyPrefix + id,"",RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES);return null;}String key = keyPrefix + id;this.set(key,r,time,unit);return r;}public <R,ID>R queryWithExpire (String keyPrefix,ID id,Class<R> type,Function<ID,R>dbFallback,Long time,TimeUnit unit){String s = stringRedisTemplate.opsForValue().get(keyPrefix + id);log.info(s);if(StrUtil.isBlank(s)){return null;}RedisData bean = JSONUtil.toBean(s, RedisData.class);JSONObject object = (JSONObject) bean.getData();R r = JSONUtil.toBean(object, type);LocalDateTime expirationTime = bean.getExpireTime();if(expirationTime.isAfter(LocalDateTime.now())){return r;}//缓存重建String lockKey = LOCK_SHOP_KEY + id;boolean isLock = tryLock(lockKey);if(isLock){CACHE_REBUILD_EXECUTOR.submit(()->{try {R apply = dbFallback.apply(id);this.setWithLogicalExpire(keyPrefix + id,apply,time,unit);} catch (Exception e) {throw new RuntimeException(e);} finally {unLock(lockKey);}});}return r;}
}
补充:
这里的StringRedisTemplate没有调用set方法,也没有注解,但是依然实现了依赖注入,这是因为在spring中,如果注入对象,被注入对象都加入IOC容器,并且有一个单一的构造函数,那么就可以实现不需要注解与配置的依赖注入。
使用举例:
 @Overridepublic Result queryById(Long id) {Shop shop = cacheClient.queryWithPassTrough(CACHE_SHOP_KEY,id,Shop.class,this::getById,CACHE_SHOP_TTL,TimeUnit.MINUTES);if(shop == null){return Result.fail("店铺不存在。");}return Result.ok(shop);}
注意,如果是缓存击穿,一定要数据预热。

三:秒杀业务:

在这种业务中,我们用全局ID生成器来生成ID:

image-20240426175726793

我们用redis来处理,可以满足要求。无论在哪里操作数据库,都是redis。

image-20240426175938648

常见全局唯一ID策略:

image-20240426212805722

注意,这里的数据库自增并不是数据库单纯的自增,因为我们知道,在后面的学习中,是分布式的,到时候,不同的线程可能会导致乱序,重序,这里是单独的那另外一张表做自增,其他的线程都从这里取id。
下面我们来看看基于redis自增的策略:
public class RedisIDWorker {public static final long BEGIN_TIMESTAMP = 1640995200L;private static final int COUNT_BITS = 32;private StringRedisTemplate stringRedisTemplate;public RedisIDWorker (StringRedisTemplate stringRedisTemplate) {this.stringRedisTemplate = stringRedisTemplate;}public long nextId (String keyPrefix) {LocalDateTime now = LocalDateTime.now();long nowSecond = now.toEpochSecond(ZoneOffset.UTC);long timestamp = nowSecond - BEGIN_TIMESTAMP;String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);return timestamp << COUNT_BITS | count;}
}

increment(String key): 这个方法是对指定的 key 进行自增操作。如果该 key 不存在,那么它的初始值会被设为 0,然后进行自增,结果为 1。如果 key 存在且其值可以转换为整数,则该值会进行自增。

3.2.超卖问题:

image-20240427204959568

image-20240427205407204

image-20240427205711299

但是,这里的票的变化是与版本同步的,所以我们没有必要用版本号,直接用票作为乐观锁。

image-20240427210009616

CAS是指"比较和交换"(Compare and Swap)的缩写,是一种原子操作,通常用于多线程编程中实现同步。CAS操作包括三个参数:需要进行操作的内存位置、旧的预期值和新的值。CAS操作会比较内存位置的当前值与旧的预期值,如果相同则将内存位置的值更新为新的值,否则不做任何操作。CAS操作是一种乐观锁的实现方式,可以避免使用传统的锁机制带来的性能开销和死锁问题。CAS操作在Java中的实现包括AtomicInteger、AtomicLong等类。
@Transactional@Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).eq("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}
}
注意,乐观锁也有弊端,他在多个线程同时买票时只能允许一个线程成功,其他线程都会失败,所以我们要解决这个问题:
boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",0).update();
  1. .eq("voucher_id", voucherId)
    • 这是一个条件语句,表示只更新那些voucher_id字段值等于voucherId的记录。voucherId应该是一个变量,存储了要更新的优惠券的ID。
  2. .gt("stock",0)
    • 这是另一个条件语句,表示只更新那些stock字段值大于0的记录。gt是“greater than”的缩写,所以这个条件确保不会更新那些库存已经为0的记录。
  3. .update();
    • 最后,调用update方法执行实际的更新操作。
注意,这里事实上是利用了数据库自带的行锁,当某个事务尝试更新某一行记录时,数据库会为该行加上行锁,阻止其他事务同时修改该行,直到当前事务完成并释放锁。

3.3.一人一单:

Long userid = UserHolder.getUser().getId();
long count = query().eq("user_id",userid).eq("voucher_id",voucherId).count();if(count > 0){return Result.fail("您已经购买过一次!");}
但是依然有问题,用户多线程操作时,依然会存在都判断通过的问题,所以我们先把代码分成几个方法,然后加锁:
 @Transactionalpublic  Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);}}
这里有两个细节:
1.userid.toString().intern(),当处理一个用户的多个请求时,如果只用toString(),该方法在Java中是new一个新的对象,那么即使是同一个用户,可能他的多次请求的锁都不一样,这是有问题的,但是intern()是在字符串常量池中判断有没有这个这个,如果有,就用这个对象。
2.不能在方法上加锁:因为所有的线程都走一个方法,可能会导致其他问题。

但是这样还是有问题:

因为这里又有事务又有锁,锁在事务的内部,如果锁先释放,事务还没有提交,然后其他线程去查询,有可能查询不到,就会存在问题:
所以我们把锁加在外面:
@Transactional
public  Result createVoucher(Long voucherId){Long userid = UserHolder.getUser().getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherId).count();if (count > 0) {return Result.fail("您已经购买过一次!");}VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);long orderId = redisIDWorker.nextId("order");voucherOrder.setId(orderId);voucherOrder.setUserId(UserHolder.getUser().getId());save(voucherOrder);return Result.ok(orderId);
}
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {return createVoucher(voucherId);}
但是这样还是有问题:
在外部同步代码块中直接调用带有@Transactional注解的方法可能会导致事务管理不生效的原因,主要是因为Spring的AOP(面向切面编程)代理机制。@Transactional注解的事务管理是通过Spring的AOP代理来实现的,具体来说:

代理模式:**Spring使用代理(通常是JDK动态代理或者CGLIB代理)来拦截那些标有@Transactional注解的方法调用。**当一个被代理的对象调用这些方法时,Spring会在调用前后插入事务管理的逻辑,比如开始事务、提交或回滚事务。
自我调用问题:在同一个类内部,如果一个方法直接调用另一个带有@Transactional注解的方法,这种调用是“直接调用”,而非通过代理对象进行。因此,Spring的AOP代理无法拦截到这种内部调用,从而不会触发事务管理的行为。
在代码示例中,虽然createVoucher方法上有@Transactional注解,但如果它是直接在类内部通过同步代码块被调用,那么这个直接调用就绕过了Spring的AOP代理,事务管理因此可能不会生效。

为了解决这个问题,确保事务管理能够正常工作,可以考虑以下几种方案:
1.通过代理对象调用:如果必须在类内部调用事务方法,可以通过注入当前类的bean实例来间接调用,这样Spring的AOP代理就能生效。重构方法结构:将事务逻辑和同步逻辑整合到一个方法中,确保事务和同步都在同一个由Spring管理的方法体内执行。
2.使用AspectJ编织:Spring也支持使用AspectJ来实现更细粒度的切面编织,包括对类内部方法调用的拦截,但这需要额外的配置和可能的编译期织入。
综上所述,直接在同步代码块中调用事务方法的问题在于它规避了Spring AOP代理,从而可能忽略了事务管理的预期效果。
修改如下:
	 Long userid = UserHolder.getUser().getId();synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}
AopContext.currentProxy(): 这是Spring提供的一个方法,用于在AOP代理方法内部获取当前代理对象。这一步很关键,因为它允许在类内部调用时仍然通过代理来执行,从而确保了@Transactional注解的事务管理能够生效。通过代理调用createVoucher: 通过强制类型转换得到的代理对象proxy来调用createVoucher方法,而不是直接调用。这样做使得事务管理逻辑能够被Spring的AOP代理捕获并正确执行,包括事务的开始、提交或回滚。
导入依赖:
 		<dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency>
暴露代理对象,在启动类上面加上注解:
@EnableAspectJAutoProxy(exposeProxy = true)

集群的并发问题:

image-20240428144744194
每个jvm内部共享一个synchronized锁监视器。如果在同一个tomcat,那肯定是没有问题的,但是如果是多个服务器,就有多个tomcat,JVM,就会出现问题。

3.4.分布式锁:

在这里插入图片描述

image-20240428145811718

image-20240428150237310

所以我们使用Redis。

为了避免释放错误,所以我们加上过期时间。注意,设置过期时间与setnx是两个语句,我们必须确保它们具有原子性。所以我们可以用set语句:
set lock k1 EX 10 NX
我们用非阻塞式实现这个操作。
public class SimpleRedisLock implements ILock{private StringRedisTemplate stringRedisTemplate;private String name;private static final String KEY_PREFIX = "lock:";public SimpleRedisLock(String name,StringRedisTemplate stringRedisTemplate) {this.name = name;this.stringRedisTemplate = stringRedisTemplate;}@Overridepublic void unLock() {stringRedisTemplate.delete(KEY_PREFIX+name);}@Overridepublic boolean tryLock(long timeoutSec) {long id = Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
}
使用方法:
Long userid = UserHolder.getUser().getId();SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);boolean isLock = lock.tryLock(1200);if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unLock();}
可能存在的问题:

image-20240428160736009

所以,要判断这个锁是不是自己的,不要把别人的锁释放了。
image-20240428161935640
注意,不要用本身的线程Id作为判断标准,因为不同的JVM有不同的线程Id,代码改进如下:
 private static  final String ID_PREFIX = UUID.randomUUID().toString(true)+"-";//......其他代码@Overridepublic void unLock() {String id = ID_PREFIX + Thread.currentThread().getId();String lock = stringRedisTemplate.opsForValue().get(KEY_PREFIX+name);if(id.equals(lock)) {stringRedisTemplate.delete(KEY_PREFIX + name);}}@Overridepublic boolean tryLock(long timeoutSec) {String id = ID_PREFIX + Thread.currentThread().getId();Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+name,id+"",timeoutSec, TimeUnit.MINUTES);return Boolean.TRUE.equals(success);}
但是事实上,还是有问题的(md,怎么这么多问题):
下面是一个极端情况:

image-20240428164447229

如果线程一判断相同通过了,但是删除锁的时候被阻塞了,还是会删除其他线程的锁。但是为什么这样的情况我们可以删掉呢:

注意这里存入redis的key,是前缀加用户ID,而前缀,后缀都是static修饰的,在同一个服务器内前缀后缀是一样的,而我们这里解决的是一人一单,也就是说,用户的ID也是一样的,因此在同一台服务器下的同一个用户的key就是固定的。其次,我们上面将后缀加上线程设置为key值,这是为了应对多台服务器下的问题,而我们用的redis,也是为了解决多台服务器下的并发问题,因此在这里,反而在同一台服务器下,在判断通过后删除之前的间隙中,删除动作迟迟不执行,然后老线程的key过期了,然后另外一个线程创建了一个相同的key,此时,老线程执行删除动作,就会删除掉新线程的key。注意,删除是按照key的!!!

所以我们引入Lua脚本:

image-20240428171308131

image-20240428172216314

image-20240428172818687

新建一个unlock.lua文件:
if(redis.call('get',KEYS[1]) == ARGV[1]) thenreturn redis.call('del',KEYS[1])
end
return 0
然后使用静态代码块加载脚本文件:
 	private static final DefaultRedisScript<Long>UNLOCK_SCRIPT;static {UNLOCK_SCRIPT = new DefaultRedisScript<>();UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));UNLOCK_SCRIPT.setResultType(Long.class);}
使用脚本:
 @Overridepublic void unLock() {stringRedisTemplate.execute(UNLOCK_SCRIPT,Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());}

image-20240428174619419

3.5.分布式锁优化——Redisson:

image-20240428175026803

因为自己写实在是太烦琐了,所以我们引入了其他第三方服务:

image-20240428175113837

导入依赖:
		<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version></dependency>
配置客户端:
@Configuration
public class RedissonConfig {@Beanpublic RedissonClient redissonClient() {Config config = new Config();config.useSingleServer().setAddress("redis://192.168.6.128").setPassword("kz32330981");return Redisson.create(config);}
}
使用分布式锁:

image-20240502194926872

举例:
 @Overridepublic Result seckillVoucher(Long voucherId) {SeckillVoucher byId = seckillVoucherService.getById(voucherId);if(byId.getBeginTime().isAfter(LocalDateTime.now())){return Result.fail("秒杀还没开始");}if(byId.getEndTime().isBefore(LocalDateTime.now())){return Result.fail("秒杀已经结束");}if(byId.getStock()<=0){return Result.fail("库存不足");}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherId).gt("stock",byId.getStock()).update();if(!success){return Result.fail("库存不足");}Long userid = UserHolder.getUser().getId();//SimpleRedisLock lock= new SimpleRedisLock("order"+userid,stringRedisTemplate);RLock lock = redissonClient.getLock("lock:order:" + userid);boolean isLock = lock.tryLock();if(!isLock){return Result.fail("不允许重复下单");}try {synchronized(userid.toString().intern()) {IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();return proxy.createVoucher(voucherId);}} finally {lock.unlock();}}
这里boolean isLock = lock.tryLock();没有设置参数,默认失败一次就马上返回。

3.6.Redissson可重入锁原理:

为什么一般情况下不能实现可重入锁:

image-20240502213631856

string只能有一个标识,当我们实现可重入锁时,如果是当前的线程,我们把它的value++,释放锁的时候–,所以我们需要两个标识,所以我们使用hash:

image-20240503115517381

image-20240503115612799

Redisson内部也是这么实现的。

3.6.Redissson可重试与避免超时释放原理:

image-20240503122435943

image-20240503123318771

注意,这里为null说明获取锁成功了。为null说明没有存在这个锁,所以当然会成功了。如果失败,返回的是当前存在的锁的有效期。
重试机制:

在Redisson中,可重试机制通常与分布式锁的实现紧密相关。当客户端尝试获取分布式锁但失败时(如锁已被其他客户端持有),Redisson会利用Redis的发布/订阅机制来实现锁的可重试。下面我将简要解释Redisson是如何结合消息订阅来实现可重试机制的:

  1. 锁获取失败与等待
    • 当客户端尝试获取分布式锁但失败时(比如调用lock.tryLock(waitTime, leaseTime, TimeUnit.unit)方法),Redisson会记录当前线程并启动一个定时任务或监听器来等待锁的释放。
  2. 消息订阅
    • 为了实现等待锁的释放,Redisson会利用Redis的发布/订阅机制。具体来说,Redisson会为每个锁维护一个特定的频道(channel)或主题(topic)。
    • 当客户端尝试获取锁但失败时,它会订阅该锁的频道。这样,一旦锁的持有者释放了锁,Redisson就会在该频道上发布一个消息。
  3. 监听器与重试
    • 客户端在订阅了锁的频道后,会设置一个监听器来监听该频道上的消息。
    • 当锁的持有者释放锁并在频道上发布消息时,监听器会立即收到通知。
    • 监听器在收到通知后,会触发一个重试机制,即再次尝试获取锁。
避免超时释放:
  • Redis中的看门狗策略通常与Redisson库一起使用,用于自动检测并处理过期键的机制。

  • 应用程序使用Redisson库监视Redis服务器上的指定键。当这些键被修改时,看门狗策略会自动触发相应的操作,如更新本地缓存或重新计算依赖项。

  • 这有助于应用程序实时响应Redis中的数据变化,并防止在数据发生变化时出现问题,如死锁和其他并发问题。

  • 具体来说,leaseTime是锁的有效期,也就是锁的持有时间。当调用lock(leaseTime, unit)方法并传入一个非-1的leaseTime时,这个特定的锁将会在一个固定的时间后自动释放,即使锁的持有者仍然在运行。在这种情况下,Redisson不会启动看门狗机制,因为锁的过期时间是由应用程序明确指定的。然而,如果调用lock()方法(没有传入leaseTime参数),或者传入-1作为leaseTime的值,那么锁将没有固定的过期时间。此时,Redisson会启动看门狗机制。看门狗会定期检查锁的持有者是否仍然活跃,并在锁即将过期时自动延长锁的有效期,从而确保锁不会被意外释放,直到锁的持有者显式地释放锁或者持有者不再活跃。
注意,只用我们没有设置时间(或者-1)才会触发这个策略。
3.7.主从一致性:
为什么以前会出现问题:

image-20240506172825214

当主节点出现问题时,从从节点获取锁就会成功。
而Redisson是怎么解决这个问题的呢:

image-20240506173309320

以下是如何使用Redisson的MultiLock的基本步骤:

  1. 配置Redisson客户端
    首先,你需要配置Redisson客户端以连接到Redis集群或哨兵模式,如之前所述。

  2. 创建RLock对象
    对于每个Redis节点,你需要创建一个RLock对象。这些RLock对象将用于组成MultiLock

    RLock lock1 = redisson1.getLock("lock1"); // 假设redisson1连接到第一个Redis节点  
    RLock lock2 = redisson2.getLock("lock2"); // 假设redisson2连接到第二个Redis节点  
    // ... 为其他节点创建RLock对象 ...
    

    注意:虽然锁的名称(如"lock1""lock2")可以不同,但出于一致性和可维护性的考虑,最好使用相同的锁名称。

  3. 组合RLock对象为MultiLock
    使用这些RLock对象创建一个MultiLock

    List<RLock> locks = Arrays.asList(lock1, lock2, /* ... 其他locks ... */);  
    RLock multiLock = redisson.getMultiLock(locks);
    
  4. 使用MultiLock
    现在你可以像使用普通的RLock一样使用MultiLock了。

    multiLock.lock();  
    try {  // 临界区代码,只有获得锁的线程才能执行  
    } finally {  multiLock.unlock();  
    }
    

    注意:在finally块中解锁是非常重要的,以确保锁总是被释放,即使发生异常。

  5. 关闭Redisson客户端
    完成所有操作后,确保关闭所有Redisson客户端实例以释放资源。

    redisson1.shutdown();  
    redisson2.shutdown();  
    // ... 关闭其他Redisson客户端 ...
    
  6. 测试
    为了测试MultiLock在多节点环境中的行为,你可以模拟节点故障、网络分区等情况,并观察MultiLock是否仍然能够正确工作。

总结:

在这里插入图片描述

3.7.异步秒杀:

原来的串行执行:

image-20240506175926269

优化如下:

image-20240506180246721

完整的流程如下:

image-20240506184456708

Part 1:保存到redis:增加redis中的优惠券数量:
 	@Override@Transactionalpublic void addSeckillVoucher(Voucher voucher) {// 保存优惠券save(voucher);// 保存秒杀信息SeckillVoucher seckillVoucher = new SeckillVoucher();seckillVoucher.setVoucherId(voucher.getId());seckillVoucher.setStock(voucher.getStock());seckillVoucher.setBeginTime(voucher.getBeginTime());seckillVoucher.setEndTime(voucher.getEndTime());seckillVoucherService.save(seckillVoucher);stringRedisTemplate.opsForValue().set("seckill:stock:" + voucher.getId(), voucher.getStock().toString());}
Part 2:编写Lua脚本:
image-20240506190359407
local voucherId = ARGV[1]
local userId = ARGV[2]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
return 0
  1. 参数定义:
    • ARGV[1]voucherId,表示优惠券ID。
    • ARGV[2]userId,表示用户的ID。
  2. 定义键名:
    • stockKey:这是优惠券库存的键名,格式为 'seckill:stock:' 加上 voucherId
    • orderKey:这是已购买(或已下单)该优惠券的用户的集合的键名,格式为 'seckill:order:' 加上 voucherId
  3. 检查库存:
    • 使用 redis.call('get', stockKey) 获取库存数量。
    • 使用 tonumber 函数将返回的字符串转换为数字。
    • 如果库存数量小于或等于0,则返回 1,表示库存不足。
  4. 检查用户是否已经购买:
    • 使用 redis.call('sismember', orderKey, userId) 检查用户是否已经在 orderKey 集合中。
    • 如果用户已经购买(即用户ID在集合中),则返回 2,表示用户已购买。
  5. 扣减库存并记录订单:
    • 如果上述两个条件都不满足(即库存充足且用户未购买),则执行以下操作:
      • 使用 redis.call('incrby', stockKey, -1) 扣减库存。
      • 使用 redis.call('sadd', orderKey, userId) 将用户ID添加到已购买集合中。
    • 这两个操作是原子性的,因为它们都在同一个Lua脚本中执行。
  6. 返回值:
    • 如果成功扣减库存并记录订单,则返回 0,表示操作成功。
    • 如果库存不足,返回 1
    • 如果用户已购买,返回 2
Part 3:基于阻塞队列的秒杀业务:
image-20240506204827037
private BlockingQueue<VoucherOrder >orderTasks = new ArrayBlockingQueue<>(1024*1024);
@Resource
private RedissonClient redissonClient;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}@Overridepublic Result seckillVoucher(Long voucherId) {Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()));int res = resultT.intValue();if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");}long id = redisIDWorker.nextId("order");VoucherOrder voucherOrder = new VoucherOrder();voucherOrder.setVoucherId(voucherId);voucherOrder.setId(id);voucherOrder.setUserId(UserHolder.getUser().getId());//在父线程就先获取代理对象proxy = (IVoucherOrderService) AopContext.currentProxy();orderTasks.add(voucherOrder);return Result.ok(id);}
Part 4:异步下单业务:
private static final ExecutorService seckill_order_executor = Executors.newSingleThreadExecutor();private IVoucherOrderService proxy;private class VoucherOrderHandler implements Runnable{@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {VoucherOrder voucherOrder = orderTasks.take();handleVoucherOrder(voucherOrder);} catch (Exception e) {log.error("处理订单异常",e);}}}
}
  1. @PostConstruct注解:

    这个注解标记在init()方法上,表示该方法会在类的实例创建并注入到Spring容器后立即调用。这里是用来启动任务处理的逻辑。

  2. init()方法:

    seckill_order_executor是一个线程池,可能是ExecutorService类型的实例。
    方法中提交了一个新的VoucherOrderHandler实例到线程池执行。这意味着每次初始化VoucherOrderHandler,都会启动一个新的线程来处理订单任务,形成一个循环执行的任务队列。

  3. run()方法:

    作为Runnable接口的实现,run()方法包含实际的业务逻辑。使用一个无限循环while (true),确保线程将持续运行,直到程序停止或线程被显式中断。

  4. try-catch块:

    在循环中,从orderTasks队列(可能是BlockingQueue类型)中获取一个VoucherOrder对象并调用handleVoucherOrder(voucherOrder)进行处理。如果在处理过程中出现任何异常,捕获并记录错误日志,但不会中断循环。这样可以确保即使处理过程中出现问题,线程仍能继续尝试处理下一个订单。

private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
注意这里的两处细节,userId与createVoucher都与父线程有关系,而在这里是异步的子线程,所以代理对象要在父线程就创建好,而userid也不能通过线程本地直接获取。这里用Redisson重复验证是为了让线程更加安全。
 @Transactionalpublic void createVoucher (VoucherOrder voucherOrder){Long userid = voucherOrder.getId();long count = query().eq("user_id", userid).eq("voucher_id", voucherOrder.getVoucherId()).count();if (count > 0) {log.error("用户已经购买过一次!");return ;}boolean success = seckillVoucherService.update().setSql("stock = stock -1").eq("voucher_id", voucherOrder.getVoucherId()).gt("stock",0).update();if(!success){log.error("库存不足");return;}save(voucherOrder);}
重复验证道理同上。

3.8.阻塞队列优化——消息队列:

基于上面的阻塞队列存在两个问题:1.内存问题 2.服务宕机的安全问题,所以我们必须进行优化。

在这里插入图片描述

1.独立于JVM,解决内存问题。
2.内部做了数据的持久化,避免安全问题。
image-20240507125920501
(1):基于List

image-20240507130119033

image-20240507130403241

(2):PubSub:

image-20240507134533877

image-20240507134641523

image-20240507135041817

image-20240507135131273

如果没有被任何频道订阅,会直接丢失。

(3):Stream:

发送消息:

image-20240507135521200

读取消息:

image-20240507135729110

image-20240507140014875

image-20240507140118244

3.9.基于Stream的消费者组:

image-20240507184608215

image-20240507184732468

image-20240507184942090

基于Java代码:

image-20240507192911659

image-20240507193426970

总结:

image-20240507193455216

3.10.代码改造:

image-20240507193619350

(1):创建消息队列:
XGROUP CREATE stream.orders g1 0 MKSTREAM
这是创建消费者组,自动创建了队列。
(2):修改Lua脚本:
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
if(tonumber(redis.call('get',stockKey))<= 0) thenreturn 1
end
if(redis.call('sismember',orderKey,userId) == 1) thenreturn 2
end
redis.call('incrby',stockKey,-1)
redis.call('sadd',orderKey,userId)
redis.call('xadd','strem.orders','*','userId',userId,'voucherId',voucherId,'id',orderId)
return 0
这行代码是使用 Redis 的 XADD 命令向名为 “stream.orders” 的 Stream 数据结构中添加一条新的消息。该消息包含了三个字段:“userId”、“voucherId” 和 “id”,分别对应传入的参数 userId、voucherId 和 orderId 的值。这样可以将订单相关的信息存储在 Redis 中,并且可以通过 Stream 数据结构的功能对这些信息进行处理和查询。这种方式可以方便地实现消息队列、事件日志等功能。
(3):修改发消息业务代码:
@Override
public Result seckillVoucher(Long voucherId) {
long id = redisIDWorker.nextId("order");
Long resultT =stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),String.valueOf(UserHolder.getUser().getId()),String.valueOf(id)
);
int res = resultT.intValue();
if(res!=0){return Result.fail(res==1?"库存不足":"不能重复下单");
}proxy = (IVoucherOrderService) AopContext.currentProxy();
return Result.ok(id);
}
(4):收消息:
 private class VoucherOrderHandler implements Runnable{String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}}
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}
我们分成三个部分来看:
4.1.正常接受处理消息:
String queueName = "stream.orders";@PostConstructpublic void init(){seckill_order_executor.submit(new VoucherOrderHandler());}@Overridepublic void run() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));if(list == null || list.isEmpty()){continue;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理订单异常",e);handlePendinglist();}}}
  1. 参数详解:
    • Consumer.from(“g1”, “c1”)
      • 这表示从消费者组 “g1” 中的消费者 “c1” 读取数据。在 RedisStreams 中,消费者组用于允许多个消费者并发地读取同一个 Stream。注意,这里因为没有c1,所以先创建了c1。
    • StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2))
      • StreamReadOptions.empty() 创建一个空的读取选项对象。
      • .count(1) 设置读取的条目数量为 1。这意味着,无论 Stream 中有多少未读的数据,此方法仅返回一个条目。
      • .block(Duration.ofSeconds(2)) 设置读取操作的阻塞时间。如果 Stream 中没有新的数据可读,那么这个方法会阻塞最多 2 秒,然后返回一个空列表。
    • StreamOffset.create(queueName, ReadOffset.lastConsumed())
      • 这定义了从哪个位置开始读取 Stream。
      • queueName 是 Stream 的名称。
      • ReadOffset.lastConsumed() 表示从消费者 “c1” 最后消费的位置开始读取。如果消费者 “c1” 还没有消费过任何数据,那么这个位置可能是 Stream 的开始位置或任何其他预定义的位置(例如 Stream 的起始点)。
等价于:
XREADGROUP GROUP g1 c1 COUNT 1 STREAMS mystream >
4.2.处理因异常而位于pending-list的消息:
  private void handlePendinglist() {while (true){try {List< MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));if(list == null || list.isEmpty()){break;}//解析消息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);handleVoucherOrder(voucherOrder);//ackstringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());} catch (Exception e) {log.error("处理pending异常",e);try {Thread.sleep(20);} catch (InterruptedException interruptedException) {e.printStackTrace();}}}}
与上面同理,只是这里是获取未确认的消息,这里的异常不做递归处理,因为外面本身就是true循环。
4.3.新增:
//增加订单业务不变
private void handleVoucherOrder(VoucherOrder voucherOrder) {Long userId = voucherOrder.getUserId();RLock lock = redissonClient.getLock("lock:order:" + userId);boolean isLock = lock.tryLock();if(!isLock){log.error("不允许重复下单");return;}try {proxy.createVoucher(voucherOrder);}finally {lock.unlock();}}

四:其他业务:

4.1.不存在该表中字段处理方法如下,后续需要自己维护:

@TableField(exist = false)
private String icon;
/*** 用户姓名*/
@TableField(exist = false)
private String name;

4.2.点赞排序:

image-20240509175116540

改造前:
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//......
stringRedisTemplate.opsForSet().add(key,userId.toString());
//......
stringRedisTemplate.opsForSet().remove(key,userId.toString());
改造后:
Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
//...
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
//......
stringRedisTemplate.opsForZSet().remove(key,userId.toString());
部分代码如下:
 @Overridepublic Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());List <UserDTO> users = userService.listByIds(ids).stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);}
注意,这里有个问题,取出的结果是有序的,都是我们在进行userService.listByIds(ids)时,数据库内部用的是WHERE id IN(1,5),而它在数据库内部的索引中,就优化了,导致无论怎么样,都是先1再5。可以用下面的sql语句优化:

image-20240509194119419

在MP中,就可以这么写:
@Override
public Result queryBlogLikes(Long id) {String key = "blog:liked:" + id;Set <String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);if(range == null || range.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> ids =  range.stream().map(Long::valueOf).collect(Collectors.toList());String idStr = StrUtil.join(",", ids);List <UserDTO> users = userService.query().in("id",ids).last("ORDER BY FIELD(id,"+idStr+")").list().stream().map(user-> BeanUtil.copyProperties(user,UserDTO.class)).collect(Collectors.toList());return Result.ok(users);
}

4.3.共同关注:

基于Set集合的交集:
 @Overridepublic Result followCommons(Long id) {Long userId = UserHolder.getUser().getId();Set<String> intersect = stringRedisTemplate.opsForSet().intersect("follows:" + userId, "follows:" + id);if(intersect==null||intersect.isEmpty()){return Result.ok(Collections.emptyList());}List<Long> collect = intersect.stream().map(Long::valueOf).collect(Collectors.toList());List<UserDTO> userDTOS = userService.listByIds(collect).stream().map(user-> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());return Result.ok(userDTOS);}

4.4.关注推送——Feed流

image-20240509211548013

image-20240509212052545

image-20240509212325997

image-20240509213611387

image-20240509213803443

image-20240509213920882

以下是基于推模式:
image-20240509214104598
分页问题:

image-20240509214321964

image-20240509214603581

滚动查询:

image-20240512185712520

代码实现如下:
@Overridepublic Result queryBlogOfFollow(Long max, Integer offset) {Long userId = UserHolder.getUser().getId();String key = "feed:" + userId;Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);if(typedTuples == null || typedTuples.isEmpty()){return Result.ok();}List<Long>ids = new ArrayList<>(typedTuples.size());long minTime = 0;int os = 1;for(ZSetOperations.TypedTuple <String> tuple : typedTuples){ids.add(Long.valueOf(tuple.getValue()));long time = tuple.getScore().longValue();if(time == minTime){os++;}else{minTime = time;os = 1;}}List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Blog blog : blogs) {queryBlogUser(blog);queryBlogLikes(blog);}ScoreResult r = new ScoreResult();r.setList(blogs);r.setMinTime(minTime);r.setOffset(os);return Result.ok(r);}
下面是关键代码解释:
 Set <ZSetOperations.TypedTuple <String>> typedTuples = stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);

参数

  1. key: 这是 Redis 中有序集合的键名。在这个例子中,它是通过 "feed:" + userId 构建的,其中 userId 是从 UserHolder 获取的当前用户的 ID。
  2. 0: 分数范围查询的起始分数(包含)。在这个例子中,查询从分数 0 开始。
  3. max: 分数范围查询的结束分数(不包含)。这是你要查询的最高分数(不包括这个分数)。
  4. offset: 分页查询的偏移量。这表示从有序集合中跳过多少个元素后再开始返回结果。
  5. 2: 这是一个限制参数,表示最多返回 2 个元素。然而,在实际应用中,这个限制可能不是非常有用,因为它会限制返回结果的数量。通常,你可能会根据分页参数(如每页大小)来动态设置这个值。
在这个例子中,通过 reverseRangeByScoreWithScores(key, 0, max, offset, 2),你正在查询从分数 0 到 max(不包含)之间的博客,按分数从高到低排序,并跳过 offset 个元素,最后最多返回 2 个结果。每个结果都是一个 TypedTuple,它包含博客的 ID 和其发布时间(作为分数)。
注意,分布时间越前的,id越小,所以从零开始,而且这里的os,及偏移量,是上次发布时间最早的时间的重复个数,不是在所有结果中的个数,比如时间戳为4 4 4 3,每次两个,第一次查出4与4,4虽然有三个,但是这里记录的是上一次查询中重复的最小时间,及两个,下一次,就从0到最大时间4,并且跳过2个,即第三个4开始查询。

4.5.附近商铺:

在这里插入图片描述

image-20240512202043735

我们可以先做数据预热,先在test中存入redis:
@Testpublic void test(){List<Shop> list = shopService.list();Map<Long, List<Shop>> map = list.stream().collect(Collectors.groupingBy(Shop::getTypeId));for(Map.Entry<Long, List<Shop>> entry : map.entrySet()){Long typeId = entry.getKey();String key = "shop:geo:" + typeId;List<Shop> value = entry.getValue();for(Shop shop : value){stringRedisTemplate.opsForGeo().add(key,new org.springframework.data.geo.Point(shop.getX(),shop.getY()),shop.getId().toString());}}
}
或者可以这么写:

image-20240512203734547

代码如下:
 @Overridepublic Result queryShopByType(Integer typeId, Integer current, Double x, Double y) {if (x == null || y == null){// 根据类型分页查询Page<Shop> page = query().eq("type_id", typeId).page(new Page<>(current, SystemConstants.DEFAULT_PAGE_SIZE));// 返回数据return Result.ok(page.getRecords());}int from = (current - 1) * SystemConstants.DEFAULT_PAGE_SIZE;int end = (current + 1) * SystemConstants.DEFAULT_PAGE_SIZE;String key = SHOP_GEO_KEY + typeId;GeoResults<RedisGeoCommands.GeoLocation<String> >search = stringRedisTemplate.opsForGeo().search(key,GeoReference.fromCoordinate(x, y),new Distance(5000),RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end));if(search == null){return Result.ok();}List<GeoResult<RedisGeoCommands.GeoLocation<String>>>contents = search.getContent();if(contents.size()<=from){return Result.ok();}//截取List <Long> ids = new ArrayList<>(contents.size());Map<String,Distance>distanceMap = new HashMap<>(contents.size());contents.stream().skip(from).forEach(result->{String shopIdStr = result.getContent().getName();ids.add(Long.valueOf(shopIdStr));Distance distance = result.getDistance();distanceMap.put(shopIdStr,distance);});List<Shop> shops = query().in("id",ids).last("ORDER BY FIELD(id," + StrUtil.join(",", ids) + ")").list();for (Shop shop : shops) {shop.setDistance(distanceMap.get(shop.getId().toString()).getValue());}return Result.ok(shops);}

4.6.用户签到:

image-20240512215935791

image-20240512220014515

image-20240513163758603

  @Overridepublic Result sign() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth() - 1;stringRedisTemplate.opsForValue().setBit(key, day, true);return Result.ok();}
String keySuffix = now.format(DateTimeFormatter.ofPattern(“:yyyyMM”));: 根据当前日期的年月(格式为"yyyyMM",例如"202302")创建一个字符串后缀,用于构建Redis键。
int day = now.getDayOfMonth() - 1;: 获取当前日期减1(因为数组或位数组通常从0开始计数,所以这里将日期减1来对应位数组的索引)。
stringRedisTemplate.opsForValue().setBit(key, day, true);: 使用Spring Data Redis的StringRedisTemplate操作位值。在Redis的键key对应的位数组中,设置第day位为true,表示用户在这一天签到了。
连续签到:

在这里插入图片描述

 @Overridepublic Result signCount() {Long userId = UserHolder.getUser().getId();LocalDateTime now = LocalDateTime.now();String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM"));String key = USER_SIGN_KEY + userId + keySuffix;int day = now.getDayOfMonth();List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));if(result == null || result.isEmpty()){return Result.ok(0);}Long num = result.get(0);if(num == null || num == 0){return Result.ok(0);}int count = 0;while (true){if((num & 1) == 0){break;}else {count++;}num >>>= 1;}return Result.ok(count);}
关键代码如下:
List <Long> result = stringRedisTemplate.opsForValue().bitField(key,BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(day)).valueAt(0));
这段代码使用 StringRedisTemplate 和 Redis 的 bitField 命令从一个 Redis 字符串键的指定偏移量开始,获取一个指定大小的位字段的无符号整数值,并将结果存储在一个 List<Long> 中。

4.7.UV统计:

image-20240513170528253

在这里插入图片描述

image-20240513171219913

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/697445.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第187题| 快速学会“阿贝尔定理”| 无穷级数(十五)|武忠祥老师每日一题

解题思路&#xff1a;这道题没有告诉我们是多少&#xff0c;没办法求出收敛半径&#xff0c;所以我们只能根据题目给的两个条件来解题&#xff08;选项代入法&#xff09;。 1.x-1&#xff0c;说明收敛的中心点是1&#xff0c;观察下列选项&#xff0c;显然答案在C和D之中。 …

腐烂的橘子 - (LeetCode)

一、概述 994. 腐烂的橘子 - 力扣&#xff08;LeetCode&#xff09;&#xff0c;今天刷到这道题&#xff0c;开始按照自己实现的思路写了一次&#xff0c;通过了调试&#xff0c;但是提交的时候&#xff0c;来了一个大的数据&#xff0c;就没有通过测试&#xff0c;百思不得其…

HIVE解决连续登录问题

HIVE解决连续登录问题 目录 HIVE解决连续登录问题 1.解决连续登录问题 如何去分析数据&#xff1a; 2.需求&#xff1a; 3.-- 间隔天数 1.解决连续登录问题 如何去分析数据&#xff1a; 1&#xff09;查看数据的字段信息 …

【简单介绍下Milvus】

&#x1f308;个人主页: 程序员不想敲代码啊 &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共…

01基础篇

1、初识 JVM 1.1 什么是 JVM JVM 全称是 Java Virtual Machine&#xff0c;中文译名 Java虚拟机。JVM 本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件。 Java源代码执行流程如下&#xff1a; 分为三个步骤&#xff1a; 编写Java源代码文件。使用…

数字化信息协同助力智能巡查,基于YOLOv5全系列【n/s/m/l/x】参数模型开发构建无人机数字侦查场景下智能靶标检测识别系统

无人机的快速发展与普及&#xff0c;使得其进入千家万户各行各业&#xff0c;发挥着越来越重要的作用。随着科技的飞速发展&#xff0c;未来的数字信息化战场正逐渐展现出其独特的作战形态。在这个以数据和信息为主导的新战场上&#xff0c;无人机侦查手段与人工智能目标智能检…

template——模板进阶(C++)

在之前的文章中&#xff0c;介绍了模板初阶&#xff1a;Cpp_桀桀桀桀桀桀的博客-CSDN博客 在本篇中将会对模板进一步的讲解。本篇中的主要内容为&#xff1a;非类型模板参数、函数模板的特化、类模板的特化&#xff08;其中包含全特化和偏特化&#xff09;&#xff0c;最后讲解…

每日一题11:Pandas:数据重塑-透视

一、每日一题 解答&#xff1a; import pandas as pddef pivotTable(weather: pd.DataFrame) -> pd.DataFrame:df_pivot weather.pivot(indexmonth, columnscity, valuestemperature)return df_pivot 题源&#xff1a;力扣 二、总结 Pandas 是一个强大的 Python 数据分析…

三更草堂前后端分离个人博客项目的开发笔记

文章目录 项目实战-前后端分离博客系统1.课程介绍2.创建工程3.博客前台3.0 准备工作3.1 SpringBoot和MybatisPuls整合配置测试 3.1 热门文章列表3.1.0 文章表分析3.1.1 需求3.1.2 接口设计3.1.3 基础版本代码实现3.1.4 使用VO优化3.1.5 字面值处理 3.2 Bean拷贝工具类封装3.2 查…

深入理解C#中的IO操作:Path类的详解

文章目录 前言一、Path类的概述二、Path类的主要方法2.1 Path.GetFullPath(string relativePath)2.2 Path.GetDirectoryName(string path)2.3 Path.GetFileName(string path)2.4 Path.GetFileNameWithoutExtension(string path)2.5 Path.GetExtension(string path)2.6 Path.Com…

ubuntu下使用docker安装es和kibana以及ik分词器还有logstash

友情提醒&#xff1a;es和kibana的版本最好一致 0.准备工作 mkdir -p /home/elasticsearch/data/ mkdir -p /home/elasticsearch/config/ mkdir -p /home/elasticsearch/plugins/ chmod -R 777 /home/elasticsearch 编写配置文件 echo http.host: 0.0.0.0 http.cors.ena…

二叉树的前序、中序、后序遍历

二叉树的前序、中序、后序 1.二叉树的前序遍历 题目&#xff1a; 二叉树的前序遍历 给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历。 示例 1&#xff1a; 输入&#xff1a;root [1,null,2,3] 输出&#xff1a;[1,2,3]示例 2&#xff1a; 输入&#xff…