JedisCluster 通过 Pipeline 实现两套数据轮换更新

其他系列文章导航

Java基础合集
数据结构与算法合集

设计模式合集

多线程合集

分布式合集

ES合集


文章目录

其他系列文章导航

文章目录

前言

一、整体流程

1.1 大致流程

1.2 流程代码解释

二、从数据库里查数据

2.1 SQL语句

三、更新当前前缀

3.1 设置前缀常量

3.2 初始化 currentPrefixIndex

3.3 获取当日前缀 

 3.4 更新 currentPrefixIndex

四、往redis集群更新数据

4.1 大致流程

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程


前言

本文实现了通过定时任务来调用接口,使两套数据轮换更新。

因为要区分两套数据,所以 key 要设置前缀。

例如:一天数据一换,今天查的 A 开头的 key ,明天查 B 开头的 key ,后天又查 A 开头的 key 。今天查完后,明天更新 B 开头的 key ,但是 A 开头的 key 暂时不动,后天再查的时候,A开头的 key 要进行更新,先删再更新。


一、整体流程

1.1 大致流程

  1. 从数据库里查数据。
  2. 更新当前前缀。
  3. 往redis集群更新数据。

1.2 流程代码解释

    @Overridepublic R<String> updateCampToJedis() {R<String> r = new R<>();SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");String currentMonth = dateFormat.format(new Date());//1. 从数据库里查数据List<UserWideInfo> UserWideInfoList = UserWideInfoMapper.selectFromTable(currentMonth);if (UserWideInfoList.size() == 0) {r.setCode(R.ERROR_CODE);r.setMsg("没有数据");return r;}//2. 更新当前前缀updateCurrentPrefixIndex();r.setCode(R.SUCCESS_CODE);//3. 往redis集群存入数据insertToJedis(ZhmsUserWideInfoList);return r;}

二、从数据库里查数据

2.1 SQL语句

这里因为每个月查询的是不同月份的表,所有用到动态 sql 。

    <select  id="selectFromTable" resultType="com.hopedata.zhmscloud.camp.entity.po.ZhmsUserWideInfo">SELECT * FROM USER_WIDE_INFO_M_${SysMonth}</select>

三、更新当前前缀

要做到更新当前前缀,需要有两套前缀不同的 key ,还需要一个能区分前缀的前缀索引 currentPrefixIndex 。

3.1 设置前缀常量

用 A 和 B 来区分两组 key 。

代码如下: 

    private static final String PREFIX_A = "A";private static final String PREFIX_B = "B";

3.2 初始化 currentPrefixIndex

向 redis集群中存入初始的 currentPrefixIndex 。

代码如下: 

    @GetMapping("/init")public String init() {return jedisCluster.set("currentPrefixIndex", "0");}

3.3 获取当日前缀 

先取出当日的前缀索引 currentPrefixIndex ,与 2 取余数 ,来获取当日的前缀。

代码如下: 

     //获取当日前缀private String getKeyPrefix() {int currentPrefixIndex = Integer.parseInt(jedisCluster.get("currentPrefixIndex"));if (currentPrefixIndex % 2 == 0) {return PREFIX_A;} else {return PREFIX_B;}}

 3.4 更新 currentPrefixIndex

每天需要更新前缀索引 currentPrefixIndex ,让 currentPrefixIndex + 1 , 使区分读的数据。

代码如下: 

    // 重新设置currentPrefixIndexprivate void updateCurrentPrefixIndex() {String currentValue = jedisCluster.get("currentPrefixIndex");int newValue = Integer.parseInt(currentValue) + 1;jedisCluster.set("currentPrefixIndex", String.valueOf(newValue));}

四、往redis集群更新数据

这其实是最重要的一步,因为同时存入大量的数据,所以要使用到 Pipeline 来实现。

4.1 大致流程

  1. 获取到当前前缀,查出相关的 key ,更新数据之前把旧数据删除。
  2. 把新数据解析后更新到 redis 集群。

注意:因为数据量大,为了减少网络性能消耗,删除和更新都要用 Pipeline 来操作。

代码如下:

    private void insertToJedis(List<UserWideInfo> UserWideInfoList) {String keyPrefix = getKeyPrefix();List<String> keys = new ArrayList<>();Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();for (JedisPool node : clusterNodes.values()) {try (Jedis jedis = node.getResource()) {Set<String> nodeKeys = jedis.keys(keyPrefix + "*");keys.addAll(nodeKeys);}}Map<JedisPool, List<String>> delKey = assignKey(keys, jedisCluster);//先删旧的for (JedisPool jedisPool : delKey.keySet()) {try (Jedis jedis = jedisPool.getResource()){Pipeline pipelined = jedis.pipelined();List<String> keysList = delKey.get(jedisPool);for (String key : keysList) {pipelined.del(key);}pipelined.sync();}}List<String> keyList =new ArrayList<>();HashMap<String, String> map = new HashMap<>();//填充keyList和valuefor (UserWideInfo UserWideInfo : UserWideInfoList) {String key = keyPrefix + "_" + UserWideInfo.getBillNo();keyList.add(key);//构建value......map.put(key, value);}Map<JedisPool, List<String>> result = assignKey(keyList, jedisCluster);for (JedisPool jedisPool : result.keySet()) {try (Jedis jedis = jedisPool.getResource()){Pipeline pipelined = jedis.pipelined();// 获取当前JedisPool对应的键列表List<String> keysList = result.get(jedisPool); // 将命令添加到Pipeline中for (String key : keysList) {String value = map.get(key);pipelined.set(key, value);}// 执行Pipeline中的所有命令pipelined.sync();}}}

五、JedisCluster 实现 Pipeline 操作

5.1 实现过程

因为 JedisCluster 不支持 Pipeline 操作,所以需要自己来实现。

代码如下:

@Slf4j
public class JedisPipelineUtil {/*** jedis集群下使用pipeline之前先将key分配管道* Map<String, List<String>> 键值为节点ip和端口号 192.168.1.1:6397 value为redis存入的key** @param list         存redis的key* @param jedisCluster* @return*/public static Map<String, List<String>> assignSlot(List<String> list, JedisCluster jedisCluster) {Map<String, List<String>> hostPhoneMap = new HashMap<>();Map<String, JedisPool> clusterNodes = jedisCluster.getClusterNodes();Map.Entry<String, JedisPool> next = clusterNodes.entrySet().iterator().next();JedisPool jedisPool = next.getValue();Jedis jedis = jedisPool.getResource();Map<Integer, String> slots = discoverClusterSlots(jedis);for (String s : list) {String hostAndPort = slots.get(JedisClusterCRC16.getSlot(s));if (hostPhoneMap.containsKey(hostAndPort)) {hostPhoneMap.get(hostAndPort).add(s);} else {List<String> newList = new ArrayList<>();newList.add(s);hostPhoneMap.put(hostAndPort, newList);}}jedis.close();return hostPhoneMap;}/*** jedis集群下使用pipeline之前先将key分配管道* Map<JedisPool, List<String>> 键值为节JedisPool value为redis存入的key** @param list         存redis的key* @param jedisCluster* @return*/public static Map<JedisPool, List<String>> assignKey(List<String> list, JedisCluster jedisCluster) {Map<JedisPool, List<String>> map = new HashMap<>();Map<String, List<String>> var1 = assignSlot(list, jedisCluster);Iterator<Map.Entry<String, List<String>>> iterator = var1.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<String, List<String>> next = iterator.next();JedisPool jedisPool = jedisCluster.getClusterNodes().get(next.getKey());map.put(jedisPool, next.getValue());}return map;}private static Map<Integer, String> discoverClusterSlots(Jedis jedis) {Map<Integer, String> slotsMap = new HashMap<>();List<Object> slots = jedis.clusterSlots();Iterator var3 = slots.iterator();while (var3.hasNext()) {Object slotInfoObj = var3.next();List<Object> slotInfo = (List) slotInfoObj;if (slotInfo.size() > 2) {List<Integer> slotNums = getAssignedSlotArray(slotInfo);List<Object> hostInfos = (List) slotInfo.get(2);if (!hostInfos.isEmpty()) {String targetNode = generateHostAndPort(hostInfos);Iterator<Integer> var4 = slotNums.iterator();while (var4.hasNext()) {Integer slot = var4.next();slotsMap.put(slot, targetNode);}}}}return slotsMap;}private static List<Integer> getAssignedSlotArray(List<Object> slotInfo) {List<Integer> slotNums = new ArrayList<>();for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1)).intValue(); ++slot) {slotNums.add(slot);}return slotNums;}private static String generateHostAndPort(List<Object> hostInfos) {String host = SafeEncoder.encode((byte[]) hostInfos.get(0));int port = ((Long) hostInfos.get(1)).intValue();return host + ":" + port;}
}

使用 assignKey 方法就可以分配管道。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/278777.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式——原型模式(创建型)

引言 原型模式是一种创建型设计模式&#xff0c; 使你能够复制已有对象&#xff0c; 而又无需使代码依赖它们所属的类。 问题 如果你有一个对象&#xff0c; 并希望生成与其完全相同的一个复制品&#xff0c; 你该如何实现呢&#xff1f; 首先&#xff0c; 你必须新建一个属于…

overleaf 加载pdf格式的矢量图时,visio 图片保存为pdf格式,如何确保pdf页面大小和图片一致

Overleaf支持多种矢量图形格式&#xff0c;其中一些常见的包括&#xff1a; PDF&#xff08;Portable Document Format&#xff09;&#xff1a; PDF是一种常见的矢量图形格式&#xff0c;Overleaf可以直接加载和显示PDF文件。许多绘图工具和LaTeX生成的图形都可以导出为PDF格式…

vue中哪些数组的方法可以做到响应式

Vue2 中为什么直接通过数组的索引修改元素是不会触发视图更新 vue2 为什么不直接监听数组 Vue2 对于数组提供了一些变异方法 重写数组方法源码分析 定义拦截器将拦截器挂载到数组上面收集依赖 扩展&#xff1a;理解Vue2如何解决数组和对象的响应式问题 对复杂对象的处理 复杂对…

【ThemeStudio】安装报错A Javascript error occurred in the main process

报错内容: 问题原因&#xff1a;系统环境缺少microsoft visual c插件 解决方法&#xff1a; 下载 微软VC 地址

elementui + vue2实现表格行的上下移动

场景&#xff1a; 如上&#xff0c;要实现表格行的上下移动 实现&#xff1a; <el-dialogappend-to-bodytitle"条件编辑":visible.sync"dialogVisible"width"60%"><el-table :data"data1" border style"width: 100%&q…

爬虫akamai案例:DHL国际物流

声明&#xff1a; 该文章为学习使用&#xff0c;严禁用于商业用途和非法用途&#xff0c;违者后果自负&#xff0c;由此产生的一切后果均与作者无关 一、Akamai简介 Akamai是一家提供内容传递网络&#xff08;CDN&#xff09;和云服务的公司。CDN通过将内容分发到全球各地的服…

uniapp:使用fixed定位,iOS平台的安全区域问题解决

manifest.json > 添加节点 "safearea": { //iOS平台的安全区域"background": "#1C1E22","backgroundDark": "#1C1E22", // HX 3.1.19支持"bottom": {"offset": "auto"} },已解决&#xff…

call 和 apply:改变对象行为的秘密武器(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

docker consul 容器的自动发现与注册

consul相关知识 什么是注册与发现 服务注册与发现是微服务架构中不可或缺的重要组件。起初服务都是单节点的&#xff0c;不保障高可用性&#xff0c;也不考虑服务的压力承载&#xff0c;服务之间调用单纯的通过接口访问。直到后来出现了多个节点的分布式架构&#xff0c;起初的…

flowable工作流看这一篇就够了(高级篇 上)

目录 一、Flowable基础表结构 1.1、表结构讲解 1.2、ProcessEngine讲解 1.2.1、硬编码的方式 1.2.2、配置文件 1.2.3、自定义配置文件 1.3、Service服务接口 1.3.1、Service创建方式 1.3.2、Service总览 1.4、图标介绍 1.4.1、事件图标 1.4.2、活动(任务)图标 1.4…

python 小程序学生选课系统源码

开发工具&#xff1a; PyCharm&#xff0c;mysql5.7&#xff0c;微信开发者工具 技术说明&#xff1a; python django html 小程序 功能介绍&#xff1a; 学生&#xff1a; 登录&#xff0c;选课&#xff08;查看课程及选择&#xff09;&#xff0c;我的成绩&#xff0c;…

解决kernel32.dll丢失的修复方式,kernel32.dll预防错误的方法

kernel32.dll文件是电脑中的一个重要文件&#xff0c;如果电脑出现kernel32.dll丢失的错误提示&#xff0c;那么电脑中的一些程序将不能正常使用&#xff0c;那么出现这样的问题有什么解决办法呢&#xff1f;那么今天就和大家说说解决kernel32.dll丢失的修复方式。 一.kernel32…