SaaS 电商设计 (五) 私有化部署-实现 binlog 中间件适配

一、 背景

  具体的中间件私有化背景在上文 SaaS` 电商设计 (二) 私有化部署-缓存中间件适配 已有做相关介绍.这里具体讨论的场景是通过解析mysql binlog 来实现mysql到其他数据源的同步.具体比如:在电商的解决方案业务流中经常有 ES 的使用场景,用以解决一些复杂的查询和搜索商品的支持以及某些数据分析的场景.那就需要做到 mysql 数据库到 ES 的数据同步.在支持 mysqlES 数据同步的过程中,常用的技术方案有这样几种.

二、 设计主体

2.1 N种方案

方案1: 业务代码成功应答后操作目标数据源写入(本文用ES举例)
在这里插入图片描述

如上第一种方案在业务代码操作数据库, 异步执行 ES 数据同步写入.如:完成商品后写入数据,异步线程开启执行写入 ES 索引录入.

方案2:业务代码成功应答后,发送MQ,利用MQ来保证 ES 写入的最终一致
在这里插入图片描述

在第一种的方案中写入 ES 步骤中可能出现ES 写入失败case. 在方案一基础上为了保证可靠性引入 MQ ,保证在ES操作时出现异常抖动能够通过重试来保证数据的最终一致性.在业务代码中实际操作数据库后发送 MQ ,这边消费 MQ 执行 ES 数据同步.如:完成商品写入数据,发送消息 MQ , MQ consumer 消费写入 ES 索引录入.

方案 3.通过binlog 来实现数据库监听,保证数据同步脱离业务代码控制
在这里插入图片描述

  • 在大部分的场景下方案二完全能够满足业务诉求. 这样的一个方案在具体实施过程中存在两个点.

  • 业务开发的同时需要同步关心数据的同步
    在某种意义上来说,数据的同步并不是业务代码需要去关心的.业务代码永远关心的只是自身的逻辑实现,关注的是产品迭代过程中如何保证业务模型的可持续演进和领域资产沉淀.基于这个原则我的理解是需要把数据的同步从业务代码里进行剥离的.

    • 散落在各个业务代码角落的维护成本
      方案二的场景在很长时间的迭代过程中很可能就将出现这样的情况.商品添加进行商品的 ES 数据更新,门店添加进行门店的 ES 数据更新.诸如此类,长期迭代将得到大量的脚本代码,随着开发人员的更替,不断的迭代和开发.最终可能变成一座岌岌可危的高楼,开发人员小心翼翼的在原来的代码上继续裹上自己这版的裹脚布.维护性和成本指数上升.
      基于此我们尝试着借助 binlog 的这样一个工具来完善第二个方案适应更多索引更新,更加复杂的同步场景.首先 binlog 的形式能够通过仅监听数据库的 binlog 的消息来做到不同数据表数据更新的收口,我们可以在消费消息的入口来定义一个处理的接口,通过表名来进行不同表消费逻辑的实现.很简单就可以做到.一石二鸟做到数据处理的收口以及逻辑代码关于数据同步逻辑的抽离.

    方案4:完美终极方案(抽离技术细节的实现,做到binlog解析的接口和数据同步的接口化.)

在这里插入图片描述

对于第三种方式来说的话,接下来引入了第二个讨论的点.

  • 私有化支持
    就是在去做一些 SaaS 场景的私有化时,咱们再去做数据同步的时候不得不依赖 binlog ,那对于 binlog 的解析常见的工具也比较多.常见的开源的 canal ,各大厂里也有相应的工具,东厂的 DRC (前身binlake),福包厂的精卫.基于此在项目中不得不在这些不同的实现之上完成抽象.这样我们就能够在既支持到内部项目的数据监听,也能够完成项目实施私有化的场景部署.
  • 同步目标逻辑的不同支持
    在上文中我们提到的最多也就是关于 ES 数据的同步,那其实在实际的开发场景可能面临的更多,比如在数据库更新后的准实时缓存刷新,数据库写入商品成功后关于商品新建成功的三方消息同步.等等.同样我们在这个基础实现了一个接口,用来方便具体的使用方来进行具体消息处理.完美.

2.2 方案4 coding落地

2.2.1 类图

在这里插入图片描述
核心步骤:

step1:抽象MessageListener 实现 BinlogListener 完成 binlog 中间件解析发送的 MQ msg 得到反序列化的表数据.内含本次选取的反序列化类型.如:是canal 还是 DRC .
step2:抽象 BinlogClientAdapter 完成反序列化和处理msg接口定义.具体可以有 CanalBinlogAdapter,DrcBinlogClientAdapter实现.
step3:抽象BinlogDataHandler 完成具体表具体操作**(insert,delete,update,query)** 接口定义.具体在接入方进行实现MultiCloundBinLogDataHandler,这样在进行注入时得到具体的实现类,进行具体的实现操作.如:CategoryBinlogDataHandler.

2.2.2 核心实现

BinlogHandlerAdapter 完成 binlog client 接口定义.

package com.baixiu.middleware.binlog.adapter;import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.mq.model.CommonMessage;/*** binlog 适配器接口* 适配中间件list:canal,jingwei,drc等。* function1:完成不同中间件解析能力* function2:完成不同中间件handlerMsg能力* @author baixiu* @date 2023年12月11日*/
public interface BinlogHandlerAdapter {/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/BinlogData deserializationMQMsg(CommonMessage mqMsg);/*** 反序列MQMsg To binlogMsg* @param mqMsg mqMsg* @return*/void handleBinLogData(BinlogData binLogData) throws Exception;}

CanalBinlogHandlerAdapter 完成 canal 解析

package com.baixiu.middleware.binlog.adapter;import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.baixiu.middleware.binlog.consts.CommonConsts;
import com.baixiu.middleware.binlog.core.AbstractBinlogHandler;
import com.baixiu.middleware.binlog.core.BinlogTableHandlerRouter;
import com.baixiu.middleware.binlog.enums.CommonRowTypeEnum;
import com.baixiu.middleware.binlog.model.BinlogData;
import com.baixiu.middleware.binlog.model.BinlogDataToDiffModel;
import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import com.baixiu.middleware.mq.model.CommonMessage;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** canal binlog handler adapter* 当property配置的clientType=canal时进行注入bean* canal client 用以解析 mq -starter 发送过来的消费消息* @author baixiu* @date 创建时间 2023/12/11 8:39 PM*/
@Slf4j
public class CanalBinlogHandlerAdapter implements BinlogHandlerAdapter{@Autowiredprivate BinlogTableHandlerRouter binlogTableHandlerRouter;@Overridepublic BinlogData deserializationMQMsg(CommonMessage mqMsg) {FlatMessage flatMessage = JSON.parseObject(mqMsg.getText(),FlatMessage.class);BinlogData binLogData=new BinlogData ();if(flatMessage!=null){binLogData.setBinlogDataObject(flatMessage);}return binLogData;}@Overridepublic void handleBinLogData(BinlogData binLogData) throws Exception {if(binLogData==null || binLogData.getBinlogDataObject()==null){return;}FlatMessage flatMessage= (FlatMessage) binLogData.getBinlogDataObject ();List<Map<String, String>> rowDatas = flatMessage.getData();List<Map<String, String>> oldDatas = flatMessage.getOld();String tableName = flatMessage.getTable();AbstractBinlogHandler handler = binlogTableHandlerRouter.ALL_TABLE_HANDLERS.get(tableName);for (int i = 0; i < rowDatas.size(); i++) {Map<String, String> rowData = rowDatas.get(i);Map<String, String> oldData = new HashMap<>(i,0.75f);if (oldDatas != null && oldDatas.size() == rowDatas.size()) {oldData = oldDatas.get(i);}Map<String, String> fieldsMaps = Maps.newHashMapWithExpectedSize(20);BinlogDataToDiffModel binlogDataToDiffModel = transRowDataToAllBinlogData(handler, rowData, oldData, fieldsMaps, flatMessage.getType());switch (binlogDataToDiffModel.getCommonRowTypeEnum()) {case INSERT:log.info("Canal.handleMessage.binlogTransConfigToMap.INSERT.{}", JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.insert(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case UPDATE:log.info("Canal.handleMessage.binlogTransConfigToMap.UPDATE.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));handler.update(binlogDataToDiffModel.getAllFieldMaps(),binlogDataToDiffModel.getBinlogTableRowDiffModels());break;case DELETE:Map<String, String> delMap = getBeforeColumnsFromBinlogData(handler, oldData);log.info("Canal.handleMessage.binlogTransConfigToMap.DELETE");handler.delete(delMap);break;default:log.info("CanalBinlogClientAdapter.handleMessage.binlogTransConfigToMap.default.{}",JSON.toJSONString(binlogDataToDiffModel.getCommonRowTypeEnum()));break;}}}public static BinlogDataToDiffModel transRowDataToAllBinlogData(AbstractBinlogHandler binlogData, Map<String, String> afterColumns, Map<String, String> beforeColumns, Map<String, String> fieldsMap, String type) {try {String[] updateFields = binlogData.getUpdateFields();String[] keyFields = binlogData.getFields();List<BinlogTableRowDiffModel> changeList = new ArrayList<> ();for (String key : afterColumns.keySet()) {if (keyFields.length == 1 && ArrayUtils.contains(keyFields, CommonConsts.BINLOG_ALL_FIELDS)) {fieldsMap.put(key, afterColumns.get(key));} else if (ArrayUtils.contains(keyFields, key)) {fieldsMap.put(key, afterColumns.get(key));}if (beforeColumns != null && !beforeColumns.isEmpty() && beforeColumns.get(key) != null) {BinlogTableRowDiffModel bean = new BinlogTableRowDiffModel();bean.setField(key);bean.setAfter(afterColumns.get(key));bean.setBefore(beforeColumns.get(key));if (updateFields.length == 1 && ArrayUtils.contains(updateFields,CommonConsts.BINLOG_ALL_FIELDS)) {changeList.add(bean);} else if (ArrayUtils.contains(updateFields, key)) {changeList.add(bean);}}}BinlogDataToDiffModel data = new BinlogDataToDiffModel(changeList, fieldsMap, CommonRowTypeEnum.transType(type));log.info("transRowDataToAllBinlogData.changeList:{}.fieldsMap{}.data{}",JSON.toJSONString(changeList), JSON.toJSONString(fieldsMap), JSON.toJSONString(data));return data;} catch (Exception e) {log.error("handleMessage.transRowDataToAllBinlogData.handleMessage.error.{}", JSON.toJSONString(binlogData), e);}return null;}/*** 删除操作* 不同的表需要从binlogData中获取的信息不同,这里抽取** @return*/private Map<String, String> getBeforeColumnsFromBinlogData(AbstractBinlogHandler binlogData, Map<String, String> beforeColumns) {Map<String, String> keys = new HashMap<>();if (beforeColumns != null && !beforeColumns.isEmpty()) {String[] keyFields = binlogData.getFields();for (String key : beforeColumns.keySet()) {// 找出关心的字段值if (ArrayUtils.contains(keyFields, key)) {keys.put(key, beforeColumns.get(key));}}}return keys;}
}

AbstractBinlogHandler 抽象binloghandler 处理类.

package com.baixiu.middleware.binlog.core;import com.baixiu.middleware.binlog.model.BinlogTableRowDiffModel;
import java.util.List;
import java.util.Map;/*** @author baixiu* @date 创建时间 2023/12/12 11:31 AM*/
public interface AbstractBinlogHandler {/*** 需要关心的字段。实现后将仅实现的字段值放置于 fieldValues 中* @return 监控字段*/String[] getFields();/*** 需要关心的变更字段。实现后将仅实现的字段值放置于 changeList 中* @return 更新字段*/String[] getUpdateFields();/*** 新增时触发* @param fieldValues 唯一字段,用于确定一条数据* @param changeList 字段的值发生变化的* @throws Exception 业务exception*/void insert(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 数据修改时触发* @param fieldValues 实现了getFields接口里得到的字段里的字段以及字段的值* @param changeList  字段的值发生变化的* @throws Exception 业务exception*/void update(Map<String, String> fieldValues, List<BinlogTableRowDiffModel> changeList) throws Exception;/*** 删除时触发* @param fieldValues 唯一字段,用于确定一条数据* @throws Exception 业务exception*/void delete(Map<String, String> fieldValues) throws Exception;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/277971.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

安装python

1.下载python 选择版本 选择可执行文件安装包 2.安装 输入python检查是否安装成功

0基础学习VR全景平台篇第128篇:720VR全景拍摄设备推荐

上课&#xff01;全体起立~ 大家好&#xff0c;欢迎观看蛙色官方系列全景摄影课程&#xff01; 本篇教程&#xff0c;小编为大家推荐720VR全景拍摄所需要的设备器材。上节我们提到&#xff0c;理论上任意相机和镜头都能够拍摄全景&#xff0c;但为了标准化制作流程&#xff0…

Transformer的学习

文章目录 Transformer1.了解Seq2Seq任务2.Transformer 整体架构3.Encoder的运作方式4.Decoder的运作方式5.AT 与 NAT6.Encoder 和 Decoder 之间的互动7.Training Transformer 1.了解Seq2Seq任务 NLP 的问题&#xff0c;都可以看做是 QA&#xff08;Question Answering&#x…

字符串——OJ题

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、字符串相加1、题目讲解2、思路讲解3、代码实现 二、仅仅反转字母1、题目讲解2、思路讲解3…

Java报错-Non-terminating decimal expansion; no exact representable decimal result

1. 背景 在使用 BigDecimal 的 divide() 对两个数相除时&#xff0c;报了如题的错误。 public class Test {public static void main(String[] args) {BigDecimal b1 new BigDecimal(1);BigDecimal b2 new BigDecimal(3);System.out.println(b1.divide(b2)); // Sys…

最新AI绘画Midjourney绘画提示词Prompt教程

一、Midjourney绘画工具 SparkAi【无需魔法使用】&#xff1a; sparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的…

动态内存管理,malloc和calloc以及realloc函数用法

目录 一.malloc函数的介绍 malloc的用法 举个例子 注意点 浅谈数据结构里的动态分配空间 二.calloc函数的介绍 三.realloc函数的介绍 四.柔性数组的介绍 为什么有些时候动态内存函数头文件是malloc.h,有些时候却是stdlib.h 一.malloc函数的介绍 malloc其实就是动态开辟…

linux内核使用ppm图片开机

什么是ppm图片 PPM&#xff08;Portable Pixmap&#xff09;是一种用于存储图像的文件格式。PPM图像文件以二进制或ASCII文本形式存储&#xff0c;并且是一种简单的、可移植的图像格式。PPM格式最初由Jef Poskanzer于1986年创建&#xff0c;并经过了多次扩展和修改。 PPM图像…

Appium 图像识别技术 OpenCV

在我们做App自动化测试的时候&#xff0c;会发现很多场景下元素没有id、content-desc、text等等属性&#xff0c;并且有可能也会碰到由于开发采用的是自定义View&#xff0c;View中的元素也无法识别到&#xff0c;很多的自动化测试框架对此类场景束手无策。Appium在V1.9.0中有给…

【AI美图】第02期效果图,AI人工智能全自动绘画,美图欣赏

今天给大家献上一组最新提示词 参照图生成图像 依据参照图生成新的图像需要掌握一些技巧&#xff0c;以下是一些可能有用的技巧&#xff1a; 观察参照图&#xff1a;在开始生成新图像之前&#xff0c;仔细观察参照图是非常重要的。你需要了解图像的布局、颜色、线条、细节等…

基于SSM的图书馆预约座位系统的设计与实现(部署+源码+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。今天给大家介绍一篇基于SSM的图书馆预约座位…

十六、YARN和MapReduce配置

1、部署前提 &#xff08;1&#xff09;配置前提 已经配置好Hadoop集群。 配置内容&#xff1a; &#xff08;2&#xff09;部署说明 &#xff08;3&#xff09;集群规划 2、修改配置文件 MapReduce &#xff08;1&#xff09;修改mapred-env.sh配置文件 export JAVA_HOM…