目录
- Mysql实时数据同步工具Alibaba Canal 使用
- Canal是什么?
- 工作原理
- 重要版本更新说明
- 环境准备
- 安装Canal
- window
- Java : Canal Client 集成
- 依赖
- 编码
- 工作流程
- 开启原生MQ
- RocketMQ 安装部署
- canal配置说明
- 1.1 canal.properties常用配置介绍:
- 2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.(instance.properties配置定义优先级高于`canal.properties`)
- 1.2 `instance.properties`常用配置介绍
- 其他学习canal资料
个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯
Mysql实时数据同步工具Alibaba Canal 使用
📖 本文核心知识点:
- Canal 是什么
- 安装Canal 服务
- 使用Canal 客户端
- 原生集成数据MQ
- 同步数据客户端服务【业务】
Canal是什么?
canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费
早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份
- 索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
工作原理
MySQL主备复制原理
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal 工作原理 - canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
重要版本更新说明
canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
- 整体性能测试&优化,提升了
150%
. #726 参考: Performance - 原生支持
prometheus
监控 #765 Prometheus QuickStart - 原生支持
kafka消息投递
#695 Canal Kafka/RocketMQ QuickStart - 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
- 原生支持
docker镜像
#801 参考: Docker QuickStart
canal1.1.4
版本,迎来最重要的WebUI能力,引入canal-admin
工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide
环境准备
安装Canal
DownLoad
版本: 1.1.7
window
- 下载 tar.gz包,解压
GitHub Canal - 配置文件设置:
解压完后修改配置文件
查看conf/canal.properties
,其中canal.port
是客户端连接的端口,需要放开,canal.admin.user
和canal.admin.passwd
是客户端连接的账号
再打开conf/example/ instance.properties
,master.address
填数据库地址,dbUsername
和dbPassword
是数据库账号,flter.regex
可以用来过滤数据库
,默认是监听所有数据库,如果想监听db_
开头的数据可以这么写db_.*\\..*
,多个用逗号分隔
- 启动服务
bin/startup.bat
log/canal.log
Java : Canal Client 集成
依赖
implementation 'com.alibaba.otter:canal.client:1.1.7'implementation 'com.alibaba.otter:canal.protocol:1.1.7'
具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。
需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意
编码
package com.kongxiang.infrastructure.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;/*** @author 孔翔* @since 2023-12-27* copyright for author : 孔翔 at 2023-12-27* study-spring3*/
@Component
@Slf4j
public class CanalService {private String canalMonitorHost = "localhost";private int canalMonitorPort = 11111;private String filterRegexTable = "xkongdb\\..*";private final static int BATCH_SIZE = 10000;@Async("canalTask")public void startCanal() {Consumer<CanalConnector> connectorConsumer = new ConsumerTask();while (true) {executeCanal(connectorConsumer);try {//防止频繁访问数据库链接: 线程睡眠 10秒ThreadUtils.sleep(Duration.ofSeconds(10));log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}}public void executeCanal(Consumer<CanalConnector> runnable) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");try {//打开连接connector.connect();log.debug("数据库检测连接成功!" + filterRegexTable);//订阅数据库表,全部表qconnector.subscribe(filterRegexTable);//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();if (runnable != null) {runnable.accept(connector);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();}}public static class ConsumerTask implements Consumer<CanalConnector> {public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//根据数据库名获取租户名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();log.info("数据库: {}, 表名: {}", databaseName, tableName);// 获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 获取序列化后的数据ByteString storeValue = entry.getStoreValue();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE|| eventType == CanalEntry.EventType.DELETE) {// 获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();// 遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();// 变更前数据for (CanalEntry.Column column : beforeColumnsList) {log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());}// 变更后数据for (CanalEntry.Column column : afterColumnsList) {log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());}}}}}}@Overridepublic void accept(CanalConnector connector) {while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {try {log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());handleMessage(message.getEntries());} catch (Exception e) {connector.rollback(batchId); // 处理失败, 回滚数据}}// 提交确认connector.ack(batchId);}}}
}
测试代码
@Test
public class CanalTest {@Testpublic void testListener() {CanalService canalService = new CanalService();canalService.startCanal();}
}
测试结果
- 当
xkongdb
的数据表的数据进行insert
,update
,delete
的时候,就会触发canal任务执行。 - 日志
工作流程
开启原生MQ
canal 1.1.1
版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:
kafka
RocketMQ
官网RocketMQ
RocketMQ 安装部署
使用MQ: RocketMQ 。 安装下载
这里使用版本: RocketMQ: 5.1.4
安装教程可以参考:
- RocketMQ(1) 基础介绍和单机-集群安装
- 官网教程
- 官网文档:部署方式
问题:windows11环境下rocketmq启动start mqnamesrv.cmd报错此时不应有 \rocketmq-all-5.0.0-ALPHA-bin-release\bin)
保证目录路径不能有 空格字符
canal配置说明
Canal的启动,是以创建实例(instance)的方式,每个实例都有自己单独的工作环境,
而配置也分成两个部分
canal.properties
(系统根配置文件)instance.properties
(instance级别的配置文件,每个instance一份)
1.1 canal.properties常用配置介绍:
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.destinations | 当前server上部署的instance列表 | 无 |
canal.conf.dir | conf/目录所在的路径 | …/conf |
canal.auto.scan | 开启instance自动扫描 如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发: a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动 b. instance目录删除:卸载对应instance配置,如已启动则进行关闭 c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作 | true |
canal.auto.scan.interval | instance自动扫描的间隔时间,单位秒 | 5 |
canal.instance.global.mode | 全局配置加载方式 | spring |
canal.instance.global.lazy | 全局lazy模式 | false |
canal.instance.global.manager.address | 全局的manager配置方式的链接信息 | 无 |
canal.instance.global.spring.xml | 全局的spring配置方式的组件文件 | classpath:spring/memory-instance.xml (spring目录相对于canal.conf.dir) |
canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml … | instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式 命名规则:canal.instance.{name}.xxx | 无 |
canal.instance.tsdb.spring.xml | v1.0.25版本新增,全局的tsdb配置方式的组件文件 | classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir) |
2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.(instance.properties配置定义优先级高于canal.properties
)
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.id | 每个canal server实例的唯一标识,暂无实际意义 | 1 |
canal.ip | canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务 | 无 |
canal.register.ip | canal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip) | 无 |
canal.port | canal server提供socket服务的端口 | 11111 |
canal.zkServers | canal server链接zookeeper集群的链接信息 例子:10.20.144.22:2181,10.20.144.51:2181 | 无 |
canal.zookeeper.flush.period | canal持久化数据到zookeeper上的更新频率,单位毫秒 | 1000 |
canal.instance.memory.batch.mode | canal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小 | MEMSIZE |
canal.instance.memory.buffer.size | canal内存store中可缓存buffer记录数,需要为2的指数 | 16384 |
canal.instance.memory.buffer.memunit | 内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小 | 1024 |
canal.instance.transactionn.size | 最大事务完整解析的长度支持 超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性 | 1024 |
canal.instance.fallbackIntervalInSeconds | canal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢 | 60 |
canal.instance.detecting.enable | 是否开启心跳检查 | false |
canal.instance.detecting.sql | 心跳检查sql | insert into retl.xdual values(1,now()) on duplicate key update x=now() |
canal.instance.detecting.interval.time | 心跳检查频率,单位秒 | 3 |
canal.instance.detecting.retry.threshold | 心跳检查失败重试次数 | 3 |
canal.instance.detecting.heartbeatHaEnable | 心跳检查失败后,是否开启自动mysql自动切换 说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据 | false |
canal.instance.network.receiveBufferSize | 网络链接参数,SocketOptions.SO_RCVBUF | 16384 |
canal.instance.network.sendBufferSize | 网络链接参数,SocketOptions.SO_SNDBUF | 16384 |
canal.instance.network.soTimeout | 网络链接参数,SocketOptions.SO_TIMEOUT | 30 |
canal.instance.filter.druid.ddl | 是否使用druid处理所有的ddl解析来获取库和表名 | true |
canal.instance.filter.query.dcl | 是否忽略dcl语句 | false |
canal.instance.filter.query.dml | 是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档) | false |
canal.instance.filter.query.ddl | 是否忽略ddl语句 | false |
canal.instance.filter.table.error | 是否忽略binlog表结构获取失败的异常(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况) | false |
canal.instance.filter.rows | 是否dml的数据变更事件(主要针对用户只订阅ddl/dcl的操作) | false |
canal.instance.filter.transaction.entry | 是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件 | false |
canal.instance.binlog.format | 支持的binlog format格式列表 (otter会有支持format格式限制) | ROW,STATEMENT,MIXED |
canal.instance.binlog.image | 支持的binlog image格式列表 (otter会有支持format格式限制) | FULL,MINIMAL,NOBLOB |
canal.instance.get.ddl.isolation | ddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致) | false |
canal.instance.parser.parallel | 是否开启binlog并行解析模式(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+) | true |
canal.instance.parser.parallelBufferSize | binlog并行解析的异步ringbuffer队列 (必须为2的指数) | 256 |
canal.instance.tsdb.enable | 是否开启tablemeta的tsdb能力 | true |
canal.instance.tsdb.dir | 主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db | c a n a l . f i l e . d a t a . d i r : . . / c o n f / {canal.file.data.dir:../conf}/ canal.file.data.dir:../conf/ |
canal.instance.tsdb.url | jdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义) | jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; |
canal.instance.tsdb.dbUsername | jdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义) | canal |
canal.instance.tsdb.dbPassword | jdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义) | canal |
canal.instance.rds.accesskey | aliyun账号的ak信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值 | 无 |
canal.instance.rds.secretkey | aliyun账号的sk信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值) | 无 |
canal.admin.manager | canal链接canal-admin的地址 (v1.1.4新增) | 无 |
canal.admin.port | admin管理指令链接端口 (v1.1.4新增) | 11110 |
canal.admin.user | admin管理指令链接的ACL配置 (v1.1.4新增) | admin |
canal.admin.passwd | admin管理指令链接的ACL配置 (v1.1.4新增) | 密码默认值为admin的密文 |
canal.user | canal数据端口订阅的ACL配置 (v1.1.4新增)如果为空,代表不开启 | 无 |
canal.passwd | canal数据端口订阅的ACL配置 (v1.1.4新增)如果为空,代表不开启 | 无 |
1.2 instance.properties
常用配置介绍
在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件
比如:
·canal.destinations = example1,example2
这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.
参数名字 | 参数说明 | 默认值 |
---|---|---|
canal.instance.mysql.slaveId | mysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定) | 无 |
canal.instance.master.address | mysql主库链接地址 | 127.0.0.1:3306 |
canal.instance.master.journal.name | mysql主库链接时起始的binlog文件 | 无 |
canal.instance.master.position | mysql主库链接时起始的binlog偏移量 | 无 |
canal.instance.master.timestamp | mysql主库链接时起始的binlog的时间戳 | 无 |
canal.instance.gtidon | 是否启用mysql gtid的订阅模式 | false |
canal.instance.master.gtid | mysql主库链接时对应的gtid位点 | 无 |
canal.instance.dbUsername | mysql数据库帐号 | canal |
canal.instance.dbPassword | mysql数据库密码 | canal |
canal.instance.defaultDatabaseName | mysql链接时默认schema | |
canal.instance.connectionCharset | mysql 数据解析编码 | UTF-8 |
canal.instance.filter.regex | mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠() 常见例子:1. 所有表:.* or .… 2. canal schema下所有表: canal…* 3. canal下的以canal打头的表:canal.canal.* 4. canal schema下的一张表:canal.test15. 多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔) | .… |
canal.instance.filter.black.regex | mysql 数据解析表的黑名单,表达式规则见白名单的规则 | 无 |
canal.instance.rds.instanceId | aliyun rds对应的实例id信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值) | 无 |
其他学习canal资料
- 【开源实战】阿里开源MySQL中间件Canal快速入门
mysql的binlog开启方式 - Canal连接MQ
- RocketMQ(1) 基础介绍和单机-集群安装