Canal 结合 SpringBoot 源码梳理

1、canal是什么,可以用来作什么

canal是阿里开源的一个用于监听数据库binlog,从而实现数据同步的工具。

2、安装

我使用的是1.1.5版本,太高的版本需要的jdk版本和mysql的驱动版本会更高,可以根据自己的环境选择。
如果是自己玩的话安装 canal.deployer-1.1.5.tar.gz就可以了
地址: Release v1.1.5 · alibaba/canal · GitHub

3、springboot+mysql+canal实现数据同步可以在网上找到很多博客,不在赘述

4、源码梳理

(1)、既然用到springboot,肯定有一个自动注入的autoconfigure的start。

可以看到spring.factories会自动注入几个client。

(2)、找到一个看着顺眼的client进去看看:

  • 我选择的是SimpleClientAutoConfiguration
@Configuration
@EnableConfigurationProperties({CanalSimpleProperties.class})
@ConditionalOnBean({EntryHandler.class})
@ConditionalOnProperty(value = {"canal.mode"},havingValue = "simple",matchIfMissing = true
)
@Import({ThreadPoolAutoConfiguration.class})
public class SimpleClientAutoConfiguration {private CanalSimpleProperties canalSimpleProperties;public SimpleClientAutoConfiguration(CanalSimpleProperties canalSimpleProperties) {this.canalSimpleProperties = canalSimpleProperties;}@Beanpublic RowDataHandler<RowData> rowDataHandler() {return new RowDataHandlerImpl(new EntryColumnModelFactory());}@Bean@ConditionalOnProperty(value = {"canal.async"},havingValue = "true",matchIfMissing = true)public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers, ExecutorService executorService) {return new AsyncMessageHandlerImpl(entryHandlers, rowDataHandler, executorService);}@Bean@ConditionalOnProperty(value = {"canal.async"},havingValue = "false")public MessageHandler messageHandler(RowDataHandler<RowData> rowDataHandler, List<EntryHandler> entryHandlers) {return new SyncMessageHandlerImpl(entryHandlers, rowDataHandler);}@Bean(initMethod = "start",destroyMethod = "stop")public SimpleCanalClient simpleCanalClient(MessageHandler messageHandler) {String server = this.canalSimpleProperties.getServer();String[] array = server.split(":");return SimpleCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(this.canalSimpleProperties.getDestination()).userName(this.canalSimpleProperties.getUserName()).password(this.canalSimpleProperties.getPassword()).messageHandler(messageHandler).batchSize(this.canalSimpleProperties.getBatchSize()).filter(this.canalSimpleProperties.getFilter()).timeout(this.canalSimpleProperties.getTimeout()).unit(this.canalSimpleProperties.getUnit()).build();}
}
看到会注入SimpleCanalClient。并且指明了初始化方法和销毁的方法。进去看看。发现是继承了一个抽象的client,这个类是关键,内部有start和stop的具体实现。
很明显,start就是启动一个线程 while(true)的去循环执行binlog的获取和处理。
如何获取的代码没有跟进,但是可以猜到,应该是通过连接然后去获取数据。
  • 这里着重看一下处理数据的代码:
public abstract class AbstractMessageHandler implements MessageHandler<Message> {private Map<String, EntryHandler> tableHandlerMap;private RowDataHandler<CanalEntry.RowData> rowDataHandler;public  AbstractMessageHandler(List<? extends EntryHandler> entryHandlers, RowDataHandler<CanalEntry.RowData> rowDataHandler) {this.tableHandlerMap = HandlerUtil.getTableHandlerMap(entryHandlers);this.rowDataHandler = rowDataHandler;}@Overridepublic  void handleMessage(Message message) {List<CanalEntry.Entry> entries = message.getEntries();  第一步 for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {  第二步try {EntryHandler<?> entryHandler = HandlerUtil.getEntryHandler(tableHandlerMap, entry.getHeader().getTableName());   第三步if(entryHandler!=null){CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()).executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();CanalContext.setModel(model);CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();    第四步CanalEntry.EventType eventType = rowChange.getEventType();for (CanalEntry.RowData rowData : rowDataList) {rowDataHandler.handlerRowData(rowData,entryHandler,eventType);}}} catch (Exception e) {throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);}finally {CanalContext.removeModel();}}}}}
  • 进入rowDataHandler.handlerRowData(maps, entryHandler, eventType);实现类选择的是RowDataHandlerImpl。
public class RowDataHandlerImpl implements RowDataHandler<CanalEntry.RowData> {private IModelFactory<List<CanalEntry.Column>> modelFactory;public RowDataHandlerImpl(IModelFactory modelFactory) {this.modelFactory = modelFactory;}@Overridepublic <R> void handlerRowData(CanalEntry.RowData rowData, EntryHandler<R> entryHandler, CanalEntry.EventType eventType) throws Exception {if (entryHandler != null) {switch (eventType) {case INSERT:R object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.insert(object);break;case UPDATE:Set<String> updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated).map(CanalEntry.Column::getName).collect(Collectors.toSet());R before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);R after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.update(before, after);break;case DELETE:R o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());entryHandler.delete(o);break;default:break;}}}
}
回想一下springboot中使用canal的时候,会有一个注解@CanalTable和一个实现类EntryHandler。
这里的代码要做的就是(1)、匹配合适的语句类型(insert、delete、update)。(2)、insert和delete只需要记录一下操作的值;update需要记录一下修改前和修改后的值。也很好理解,insert和delete回滚只需要反向重放代码就行,而update需要知道之前的数据采集重新update。
  • 进入newInstance方法,选择AbstractModelFactory:
public abstract class AbstractModelFactory<T> implements IModelFactory<T> {@Overridepublic <R> R newInstance(EntryHandler entryHandler, T t) throws Exception {String canalTableName = HandlerUtil.getCanalTableName(entryHandler);if (TableNameEnum.ALL.name().toLowerCase().equals(canalTableName)) {return (R) t;}Class<R> tableClass = GenericUtil.getTableClass(entryHandler);if (tableClass != null) {return newInstance(tableClass, t);}return null;}abstract <R> R newInstance(Class<R> c, T t) throws Exception;
}

重点来了,有两个HandlerUtil.getCanalTableName和GenericUtil.getTableClass。还记得咱们再springboot中的代码会指定 @CanalTable 处理的是那个表和EntryHandler泛型吗。
第一步判断这个EntryHandler实现类有没有指定要处理那个表,如果指定了All。那么就要就走自定义的返回值,这个返回值通常不是我们需要的。所以在使用中一定尽量指定要处理的表。
第二步需要匹配EntryHandler中的泛型类进行赋值操作了。
  • 最后进入newInstance方法:
public class EntryColumnModelFactory extends AbstractModelFactory<List<CanalEntry.Column>> {......@Override<R> R newInstance(Class<R> c, List<CanalEntry.Column> columns) throws Exception {R object = c.newInstance();Map<String, String> columnNames = EntryUtil.getFieldName(object.getClass());for (CanalEntry.Column column : columns) {String fieldName = columnNames.get(column.getName());if (StringUtils.isNotEmpty(fieldName)) {FieldUtil.setFieldValue(object, fieldName, column.getValue());}}return object;}}

        代码比较简单,通过反射给对象赋值。如果不太清楚这里是怎么把数据解析出来的,可以自己搭建起来服务执行一下看看canal返回的结构体,我下边也提出来我的返回,并且我也会将上边代码中和数据解析的地方标红。
获取消息 Message[id=14,entries=[header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19806serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: " 9"
, header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19939serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: "test"tableName: "first"eventLength: 53eventType: INSERTprops {key: "rowsCount"value: "1"}
}
entryType: ROWDATA
storeValue: "\b\341\001\020\001P\000b\203\001\022\"\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\0016R\006bigint\022%\b\001\020\f\032\aaddress \000(\0010\000B\003333R\vvarchar(10)\0226\b\002\020]\032\vcreate_time \000(\0010\000B\0232024-02-02 09:41:43R\bdatetime"
, header {version: 1logfileName: "mysql-bin.000004"logfileOffset: 19992serverId: 1serverenCode: "UTF-8"executeTime: 1706838103000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\006381841"
],raw=false,rawEntries=[]]

至此,在springboot中通过canal获取binlog的日志并且解析为自定义的entry对象的流程就已经分析、梳理完了。至于后续要怎么处理就有很多的方式了。
最后在分享一个idea跟踪源码的小技巧:
比如我们看到一个比较重要的注解,但是不知道这个注解具体实现在哪里,可以进入注解中,选中注解名称,然后选择Find Usages。就可以看到哪里使用了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/451243.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开源浏览器Firefox:使用Docker本地部署并远程访问进行测试

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 部署Firefox二. 本地访问Firefox三. Linux安装Cpolar四. 配置Firefox公网地址…

Javaweb之SpringBootWeb案例之 @ConfigurationProperties的详细解析

4.3 ConfigurationProperties 讲解完了yml配置文件之后&#xff0c;最后再来介绍一个注解ConfigurationProperties。在介绍注解之前&#xff0c;我们先来看一个场景&#xff0c;分析下代码当中可能存在的问题&#xff1a; 我们在application.properties或者application.yml中配…

我用全志V851s做了一个魔法棒,使用Keras训练手势识别模型控制一切电子设备

这是一个可以直接启动原神的魔法棒~ 原神&#xff0c;启动&#xff01; 这是一个万全的解决方案&#xff01;只需要花80元再动动手&#xff0c;就可以将哈利波特的魔杖与人工智能结合到一起&#xff01;它就是用全志V851s做的赛博魔杖&#xff01; 这个魔法手杖有啥亮点 手势…

Python爬虫requests库详解

使用 requests 上一节中&#xff0c;我们了解了 urllib 的基本用法&#xff0c;但是其中确实有不方便的地方&#xff0c;比如处理网页验证和 Cookies 时&#xff0c;需要写 Opener 和 Handler 来处理。为了更加方便地实现这些操作&#xff0c;就有了更为强大的库 requests&…

AIGC 为何能火爆全网,赋能智能时代?

Hi&#xff0c;大家好&#xff0c;我是半亩花海。2023年&#xff0c;人工智能新浪潮涌起&#xff0c;AIGC 火爆全网&#xff0c;不断赋能各大行业。从短视频平台上火爆的“AI 绘画”&#xff0c;到智能聊天软件 ChatGPT&#xff0c;都引起了大家的广泛关注。那么 AIGC 到底是什…

Redis 的持久化机制是什么?各自的优缺点?

Redis 提供两种持久化机制 RDB&#xff08;默认&#xff09; 和 AOF 机制: RDB&#xff1a;是Redis DataBase缩写快照 RDB是Redis默认的持久化方式。按照一定的时间将内存的数据以快照的形式保存到硬盘中&#xff0c;对应产生的数据文件为dump.rdb。通过配置文件中的save参数来…

初识vue3

文章目录 1.Vue3的好处2.create-vue搭建vue3项目3.项目目录和关键文件4.组合式API - setup选项5.组合式API - reactive和ref函数①reactive②ref() 6.组合式API - computed7.组合式API - watch①基础使用 - 侦听单个数据②基础使用 - 侦听多个数据③immediate④精确侦听对象的某…

集成电路中电容的重要性

集成电路电容是用于存储电能的一种元件&#xff0c;基本原理是充电放电&#xff0c;通交流隔直流。即在直流系统中&#xff0c;电容起到“断路”作用&#xff0c;直流信号无法通过电容&#xff0c;在交流系统中&#xff0c;电容起到“短路”作用&#xff0c;即交流信号可以通过…

2024年【N1叉车司机】考试技巧及N1叉车司机模拟试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 N1叉车司机考试技巧考前必练&#xff01;安全生产模拟考试一点通每个月更新N1叉车司机模拟试题题目及答案&#xff01;多做几遍&#xff0c;其实通过N1叉车司机证考试很简单。 1、【多选题】《中华人民共和国特种设备…

网络攻防模拟与城市安全演练 | 图扑数字孪生

在数字化浪潮的推动下&#xff0c;网络攻防模拟和城市安全演练成为维护社会稳定的不可或缺的环节。基于数字孪生技术我们能够在虚拟环境中进行高度真实的网络攻防模拟&#xff0c;为安全专业人员提供实战经验&#xff0c;从而提升应对网络威胁的能力。同时&#xff0c;在城市安…

思特威用的什么ERP系统

思特威&#xff0c;作为全球知名的科技企业&#xff0c;其成功离不开高效的企业资源管理。这就不得不提其所采用的ERP系统&#xff0c;一个对企业运营起到核心作用的信息管理平台。 ERP&#xff0c;即企业资源计划&#xff0c;是一种集成化管理思想和方法&#xff0c;它以信息技…

C语言在Visual Studio 2010环境下使用<regex.h>正则表达式函数库

在Visual Studio 2010环境下&#xff0c;如果C语言想要使用<regex.h>头文件进行正则表达式匹配&#xff0c;则需要pcre3.dll这个动态链接库&#xff0c;可以去网上下载。 下载的网址是&#xff1a;Pcre for Windowspcre {whatisit}https://gnuwin32.sourceforge.net/pac…