Mysql实时数据同步工具Alibaba Canal 使用

目录

  • Mysql实时数据同步工具Alibaba Canal 使用
    • Canal是什么?
      • 工作原理
      • 重要版本更新说明
    • 环境准备
    • 安装Canal
      • window
    • Java : Canal Client 集成
      • 依赖
      • 编码
    • 工作流程
    • 开启原生MQ
      • RocketMQ 安装部署
    • canal配置说明
      • 1.1 canal.properties常用配置介绍:
      • 2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.(instance.properties配置定义优先级高于`canal.properties`)
      • 1.2 `instance.properties`常用配置介绍
    • 其他学习canal资料

个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯


在这里插入图片描述

Mysql实时数据同步工具Alibaba Canal 使用

📖 本文核心知识点:

  • Canal 是什么
  • 安装Canal 服务
  • 使用Canal 客户端
  • 原生集成数据MQ
  • 同步数据客户端服务【业务】

Canal是什么?

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
    canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

重要版本更新说明

canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:

  • 整体性能测试&优化,提升了150%. #726 参考: Performance
  • 原生支持prometheus监控 #765 Prometheus QuickStart
  • 原生支持kafka消息投递 #695 Canal Kafka/RocketMQ QuickStart
  • 原生支持aliyun rds的binlog订阅 (解决自动主备切换/oss binlog离线解析) 参考: Aliyun RDS QuickStart
  • 原生支持docker镜像 #801 参考: Docker QuickStart
    canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力, Canal admin guide

环境准备

安装Canal

DownLoad

版本: 1.1.7

window

  1. 下载 tar.gz包,解压
    GitHub Canal
  2. 配置文件设置:
    解压完后修改配置文件
    查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.usercanal.admin.passwd是客户端连接的账号
    在这里插入图片描述
    再打开conf/example/ instance.properties, master.address填数据库地址,dbUsernamedbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔
    在这里插入图片描述
  3. 启动服务 bin/startup.bat
    log/canal.log
    在这里插入图片描述

Java : Canal Client 集成

依赖

    implementation 'com.alibaba.otter:canal.client:1.1.7'implementation 'com.alibaba.otter:canal.protocol:1.1.7'

具体的数据库数据变化 业务实现方面需要 自己手动去实现,仅展示自己使用的部分。

需要注意: 如果是多个客户端同时使用,要注意:多个客户端会出现某个客户端 把消息全部消费,而别的客户端没有消息消费的情况,这里需要特别注意

编码

package com.kongxiang.infrastructure.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ThreadUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;/*** @author 孔翔* @since 2023-12-27* copyright for author : 孔翔 at 2023-12-27* study-spring3*/
@Component
@Slf4j
public class CanalService {private String canalMonitorHost = "localhost";private int canalMonitorPort = 11111;private String filterRegexTable = "xkongdb\\..*";private final static int BATCH_SIZE = 10000;@Async("canalTask")public void startCanal() {Consumer<CanalConnector> connectorConsumer = new ConsumerTask();while (true) {executeCanal(connectorConsumer);try {//防止频繁访问数据库链接: 线程睡眠 10秒ThreadUtils.sleep(Duration.ofSeconds(10));log.debug("防止频繁访问数据库链接: 线程睡眠 10秒");} catch (InterruptedException e) {e.printStackTrace();Thread.currentThread().interrupt();}}}public void executeCanal(Consumer<CanalConnector> runnable) {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalMonitorHost, canalMonitorPort), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");try {//打开连接connector.connect();log.debug("数据库检测连接成功!" + filterRegexTable);//订阅数据库表,全部表qconnector.subscribe(filterRegexTable);//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();if (runnable != null) {runnable.accept(connector);}} catch (Exception e) {e.printStackTrace();log.error("成功断开监测连接!尝试重连");} finally {connector.disconnect();}}public static class ConsumerTask implements Consumer<CanalConnector> {public void handleMessage(List<CanalEntry.Entry> entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}//根据数据库名获取租户名String databaseName = entry.getHeader().getSchemaName();String tableName = entry.getHeader().getTableName();log.info("数据库: {}, 表名: {}", databaseName, tableName);// 获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 获取序列化后的数据ByteString storeValue = entry.getStoreValue();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE|| eventType == CanalEntry.EventType.DELETE) {// 获取数据集List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();// 遍历rowDataList,并打印数据集for (CanalEntry.RowData rowData : rowDataList) {List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();// 变更前数据for (CanalEntry.Column column : beforeColumnsList) {log.info("变更前数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());}// 变更后数据for (CanalEntry.Column column : afterColumnsList) {log.info("变更后数据: name: {}, value: {} ,update {}", column.getName(), column.getValue(), column.getUpdated());}}}}}}@Overridepublic void accept(CanalConnector connector) {while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {} else {try {log.debug("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());handleMessage(message.getEntries());} catch (Exception e) {connector.rollback(batchId); // 处理失败, 回滚数据}}// 提交确认connector.ack(batchId);}}}
}

测试代码

@Test
public class CanalTest {@Testpublic void testListener() {CanalService canalService = new CanalService();canalService.startCanal();}
}

测试结果

  1. xkongdb的数据表的数据进行 insert,update,delete的时候,就会触发canal任务执行。
  2. 日志 在这里插入图片描述

工作流程

在这里插入图片描述

开启原生MQ

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

  • kafka
  • RocketMQ官网RocketMQ

RocketMQ 安装部署

使用MQ: RocketMQ 。 安装下载
这里使用版本: RocketMQ: 5.1.4
安装教程可以参考:

  • RocketMQ(1) 基础介绍和单机-集群安装
  • 官网教程
  • 官网文档:部署方式

问题:windows11环境下rocketmq启动start mqnamesrv.cmd报错此时不应有 \rocketmq-all-5.0.0-ALPHA-bin-release\bin)
保证目录路径不能有 空格字符

canal配置说明

Canal的启动,是以创建实例(instance)的方式,每个实例都有自己单独的工作环境,

而配置也分成两个部分

  • canal.properties (系统根配置文件)
  • instance.properties (instance级别的配置文件,每个instance一份)

1.1 canal.properties常用配置介绍:

参数名字参数说明默认值
canal.destinations当前server上部署的instance列表
canal.conf.dirconf/目录所在的路径…/conf
canal.auto.scan开启instance自动扫描 如果配置为true,canal.conf.dir目录下的instance配置变化会自动触发: a. instance目录新增: 触发instance配置载入,lazy为true时则自动启动 b. instance目录删除:卸载对应instance配置,如已启动则进行关闭 c. instance.properties文件变化:reload instance配置,如已启动自动进行重启操作true
canal.auto.scan.intervalinstance自动扫描的间隔时间,单位秒5
canal.instance.global.mode全局配置加载方式spring
canal.instance.global.lazy全局lazy模式false
canal.instance.global.manager.address全局的manager配置方式的链接信息
canal.instance.global.spring.xml全局的spring配置方式的组件文件classpath:spring/memory-instance.xml (spring目录相对于canal.conf.dir)
canal.instance.example.mode canal.instance.example.lazy canal.instance.example.spring.xml …instance级别的配置定义,如有配置,会自动覆盖全局配置定义模式 命名规则:canal.instance.{name}.xxx
canal.instance.tsdb.spring.xmlv1.0.25版本新增,全局的tsdb配置方式的组件文件classpath:spring/tsdb/h2-tsdb.xml (spring目录相对于canal.conf.dir)

2.common参数定义,比如可以将instance.properties的公用参数,抽取放置到这里,这样每个instance启动的时候就可以共享.(instance.properties配置定义优先级高于canal.properties)

参数名字参数说明默认值
canal.id每个canal server实例的唯一标识,暂无实际意义1
canal.ipcanal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
canal.register.ipcanal server注册到外部zookeeper、admin的ip信息 (针对docker的外部可见ip)
canal.portcanal server提供socket服务的端口11111
canal.zkServerscanal server链接zookeeper集群的链接信息 例子:10.20.144.22:2181,10.20.144.51:2181
canal.zookeeper.flush.periodcanal持久化数据到zookeeper上的更新频率,单位毫秒1000
canal.instance.memory.batch.modecanal内存store中数据缓存模式 1. ITEMSIZE : 根据buffer.size进行限制,只限制记录的数量 2. MEMSIZE : 根据buffer.size * buffer.memunit的大小,限制缓存记录的大小MEMSIZE
canal.instance.memory.buffer.sizecanal内存store中可缓存buffer记录数,需要为2的指数16384
canal.instance.memory.buffer.memunit内存记录的单位大小,默认1KB,和buffer.size组合决定最终的内存使用大小1024
canal.instance.transactionn.size最大事务完整解析的长度支持 超过该长度后,一个事务可能会被拆分成多次提交到canal store中,无法保证事务的完整可见性1024
canal.instance.fallbackIntervalInSecondscanal发生mysql切换时,在新的mysql库上查找binlog时需要往前查找的时间,单位秒 说明:mysql主备库可能存在解析延迟或者时钟不统一,需要回退一段时间,保证数据不丢60
canal.instance.detecting.enable是否开启心跳检查false
canal.instance.detecting.sql心跳检查sqlinsert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.interval.time心跳检查频率,单位秒3
canal.instance.detecting.retry.threshold心跳检查失败重试次数3
canal.instance.detecting.heartbeatHaEnable心跳检查失败后,是否开启自动mysql自动切换 说明:比如心跳检查失败超过阀值后,如果该配置为true,canal就会自动链到mysql备库获取binlog数据false
canal.instance.network.receiveBufferSize网络链接参数,SocketOptions.SO_RCVBUF16384
canal.instance.network.sendBufferSize网络链接参数,SocketOptions.SO_SNDBUF16384
canal.instance.network.soTimeout网络链接参数,SocketOptions.SO_TIMEOUT30
canal.instance.filter.druid.ddl是否使用druid处理所有的ddl解析来获取库和表名true
canal.instance.filter.query.dcl是否忽略dcl语句false
canal.instance.filter.query.dml是否忽略dml语句 (mysql5.6之后,在row模式下每条DML语句也会记录SQL到binlog中,可参考MySQL文档)false
canal.instance.filter.query.ddl是否忽略ddl语句false
canal.instance.filter.table.error是否忽略binlog表结构获取失败的异常(主要解决回溯binlog时,对应表已被删除或者表结构和binlog不一致的情况)false
canal.instance.filter.rows是否dml的数据变更事件(主要针对用户只订阅ddl/dcl的操作)false
canal.instance.filter.transaction.entry是否忽略事务头和尾,比如针对写入kakfa的消息时,不需要写入TransactionBegin/Transactionend事件false
canal.instance.binlog.format支持的binlog format格式列表 (otter会有支持format格式限制)ROW,STATEMENT,MIXED
canal.instance.binlog.image支持的binlog image格式列表 (otter会有支持format格式限制)FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolationddl语句是否单独一个batch返回(比如下游dml/ddl如果做batch内无序并发处理,会导致结构不一致)false
canal.instance.parser.parallel是否开启binlog并行解析模式(串行解析资源占用少,但性能有瓶颈, 并行解析可以提升近2.5倍+)true
canal.instance.parser.parallelBufferSizebinlog并行解析的异步ringbuffer队列 (必须为2的指数)256
canal.instance.tsdb.enable是否开启tablemeta的tsdb能力true
canal.instance.tsdb.dir主要针对h2-tsdb.xml时对应h2文件的存放目录,默认为conf/xx/h2.mv.db c a n a l . f i l e . d a t a . d i r : . . / c o n f / {canal.file.data.dir:../conf}/ canal.file.data.dir:../conf/
canal.instance.tsdb.urljdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsernamejdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canal
canal.instance.tsdb.dbPasswordjdbc url的配置(h2的地址为默认值,如果是mysql需要自行定义)canal
canal.instance.rds.accesskeyaliyun账号的ak信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值
canal.instance.rds.secretkeyaliyun账号的sk信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)
canal.admin.managercanal链接canal-admin的地址 (v1.1.4新增)
canal.admin.portadmin管理指令链接端口 (v1.1.4新增)11110
canal.admin.useradmin管理指令链接的ACL配置 (v1.1.4新增)admin
canal.admin.passwdadmin管理指令链接的ACL配置 (v1.1.4新增)密码默认值为admin的密文
canal.usercanal数据端口订阅的ACL配置 (v1.1.4新增)如果为空,代表不开启
canal.passwdcanal数据端口订阅的ACL配置 (v1.1.4新增)如果为空,代表不开启

1.2 instance.properties常用配置介绍

在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件

比如:

·canal.destinations = example1,example2
这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.

参数名字参数说明默认值
canal.instance.mysql.slaveIdmysql集群配置中的serverId概念,需要保证和当前mysql集群中id唯一 (v1.1.x版本之后canal会自动生成,不需要手工指定)
canal.instance.master.addressmysql主库链接地址127.0.0.1:3306
canal.instance.master.journal.namemysql主库链接时起始的binlog文件
canal.instance.master.positionmysql主库链接时起始的binlog偏移量
canal.instance.master.timestampmysql主库链接时起始的binlog的时间戳
canal.instance.gtidon是否启用mysql gtid的订阅模式false
canal.instance.master.gtidmysql主库链接时对应的gtid位点
canal.instance.dbUsernamemysql数据库帐号canal
canal.instance.dbPasswordmysql数据库密码canal
canal.instance.defaultDatabaseNamemysql链接时默认schema
canal.instance.connectionCharsetmysql 数据解析编码UTF-8
canal.instance.filter.regexmysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠() 常见例子:1. 所有表:.* or . 2. canal schema下所有表: canal…* 3. canal下的以canal打头的表:canal.canal.* 4. canal schema下的一张表:canal.test15. 多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔).
canal.instance.filter.black.regexmysql 数据解析表的黑名单,表达式规则见白名单的规则
canal.instance.rds.instanceIdaliyun rds对应的实例id信息(如果不需要在本地binlog超过18小时被清理后自动下载oss上的binlog,可以忽略该值)

其他学习canal资料

  • 【开源实战】阿里开源MySQL中间件Canal快速入门
    mysql的binlog开启方式
  • Canal连接MQ
  • RocketMQ(1) 基础介绍和单机-集群安装

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/311768.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2023-12-20 LeetCode每日一题(判别首字母缩略词)

2023-12-20每日一题 一、题目编号 2828. 判别首字母缩略词二、题目链接 点击跳转到题目位置 三、题目描述 给你一个字符串数组 words 和一个字符串 s &#xff0c;请你判断 s 是不是 words 的 首字母缩略词 。 如果可以按顺序串联 words 中每个字符串的第一个字符形成字符…

【深度学习】Normalizing flow原理推导+Pytorch实现

1、前言 N o r m a l i z i n g f l o w \boxed{Normalizing \hspace{0.1cm} flow} Normalizingflow​&#xff0c;流模型&#xff0c;一种能够与目前流行的生成模型—— G A N 、 V A E \boxed{\mathbf{GAN、VAE}} GAN、VAE​相媲美的模型。其也是一个生成模型&#xff0c;可是…

ZYNQ 7020 之 FPGA知识点重塑笔记一——串口通信

目录 一&#xff1a;串口通信简介 二&#xff1a;三种常见的数据通信方式—RS232串口通信 2.1 实验任务 2.2 串口接收模块的设计 2.2.1 代码设计 2.3 串口发送模块的设计 2.3.1 代码设计 2.4 顶层模块编写 2.4.1 代码设计 2.4.2 仿真验证代码 2.4.3 仿真结果 2.4.4…

阿里云PolarDB数据库优惠价格表11元一天起

阿里云数据库PolarDB租用价格表&#xff0c;云数据库PolarDB MySQL版2核4GB&#xff08;通用&#xff09;、2个节点、60 GB存储空间55元5天&#xff0c;云数据库 PolarDB 分布式版标准版2核16G&#xff08;通用&#xff09;57.6元3天&#xff0c;阿里云百科aliyunbaike.com分享…

uni-app js语法

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

Lesson 06 vector类(上)

C&#xff1a;渴望力量吗&#xff0c;少年&#xff1f; 文章目录 一、vector是什么&#xff1f;二、vector的使用1. 构造函数2. vector iterator3. vector 空间增长问题4. vector增删查改 三、vector实际使用 一、vector是什么&#xff1f; vector是表示可变大小数组的序列容器…

DFS

目录 DFS 实现数字全排列 N 皇后问题 DFS 算法的理解 优先考虑深度&#xff0c;换句话说就是一条路走到黑&#xff0c;直到无路可走的情况下&#xff0c;才会选择回头&#xff0c;然后重新选择一条路。空间复杂度&#xff1a;O&#xff08;h&#xff09;和高度成正比 不具…

设计模式——适配器模式(Adapter Pattern)

概述 适配器模式可以将一个类的接口和另一个类的接口匹配起来&#xff0c;而无须修改原来的适配者接口和抽象目标类接口。适配器模式(Adapter Pattern)&#xff1a;将一个接口转换成客户希望的另一个接口&#xff0c;使接口不兼容的那些类可以一起工作&#xff0c;其别名为包装…

UE蓝图 RPG动作游戏(一) day15

角色状态制作 制作角色动画混合空间 创建一个动混合空间 添加动作在混合空间 动画蓝图 创建一个动画蓝图 先使用混合空间进行移动&#xff0c;后续优化后再使用状态机 编写垂直水平速度逻辑初始化&#xff0c;获取到此动画的角色组件 获取Horizontal与Vertical的速度逻辑 …

C语言之指针和函数

目录 作为函数参数的指针 二值互换 scanf函数和指针 指针的类型 空指针 标量型 在C语言程序中&#xff0c;指针的一个重要作用就是作为函数参数使用&#xff0c;下面我们就来学习作为函数参数的指针的相关内容。 作为函数参数的指针 假如我有一个神奇的能力&#xff0c;能…

【Vue2+3入门到实战】(18)VUE之Vuex状态管理器概述、VueX的安装、核心概念 State状态代码实现 详细讲解

目录 一、[Vuex](https://vuex.vuejs.org/zh/) 概述1.是什么2.使用场景3.优势4.注意&#xff1a; 二、需求: 多组件共享数据1.创建项目2.创建三个组件, 目录如下3.源代码如下 三、vuex 的使用 - 创建仓库1.安装 vuex2.新建 store/index.js 专门存放 vuex3.创建仓库 store/index…

骑砍战团MOD开发(30)-游戏大地图map.txt

骑砍1战团mod开发-大地图制作方法_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1rz4y1c7wH/ 一.骑砍游戏大地图 骑砍RTS视角游戏大地图 大地图静态模型(map.txt) 军团/城镇图标(module_parties.py). 骑砍大地图的战争迷雾和天气通过API进行管理和控制: # Weather-h…