Canal安装使用

一 Canal介绍

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL(也支持mariaDB)。

背景
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。ps. 目前内部使用的同步,已经支持mysql5.x和oracle部分版本的日志解析了;

工作原理
在这里插入图片描述
原理相对比较简单:
canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议,mysql master收到dump请求,开始推送binary log给slave(也就是canal),canal解析binary log对象(原始为byte流);
在这里插入图片描述

二 linux下安装配置启动Canal

下载
首先,下载canal安装包:canal下载地址
在这里插入图片描述
安装步骤
1、创建一个canal文件夹,上传压缩包并解压
2、修改canal配置文件

vi conf/example/instance.properties

在这里插入图片描述
3、修改mysql配置

vi /etc/my.cnf 
log-bin=mysql-bin             #添加这一行就ok 
binlog-format=ROW           #选择row模式 
server-id=1       #配置mysql replaction需要定义,不能和canal的slaveI重复
binlog-do-db=micromall

4、执行mysql 创建canal用户

create user canal identified by 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

查看是否授权成功:

select * from user  where  user='canal'

canal的工作原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal );
  3. canal 解析 binary log 对象(原始为 byte 流);

5、启动canal

cd bin
./startup.sh

三 测试Canal

逻辑:
mysql开启主从同步
启动Canal,拿到mysql的binlog日志
消息中间件获取binlog日志
添加依赖

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.0</version>
</dependency>
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

修改数据库的值,发现日志里有新的输出
在这里插入图片描述

四 接入中间件

目的:修改mysql数据后,把消息同步到RocketMQ

修改instance 配置文件

vi conf/example/instance.properties
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

instance.properties完整版

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info
#canal.instance.master.address=192.168.65.232:3306
canal.instance.master.address=192.168.65.232:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=micromall
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=micromall.pms_product,micromall.sms_flash_promotion_product_relation
# table black regex
canal.instance.filter.black.regex=# mq config
canal.mq.topic=productDetailChange
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

修改canal 配置文件

vi /usr/local/canal/conf/canal.properties
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 192.168.1.150:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = Canal-Producer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

canal.properties完整版

#################################################
######### 		common argument		############# 
#################################################
#canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
#canal.manager.jdbc.username=root
#canal.manager.jdbc.password=121212
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = RocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 		     MQ 		     #############
##################################################
##################################################
#########                    MQ     #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:9876
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
#消息生产组名
canal.mq.producerGroup = canalProducer
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 30
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# use transaction for kafka flatMessage batch produce
canal.mq.transaction = false
#canal.mq.properties. =

参数说明:
在这里插入图片描述

五 Canal内部原理

在这里插入图片描述

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列 (1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储)
  • metaManager (增量订阅&消费信息管理器)
mysql> show variables like 'binlog_format';+---------------+-------+| Variable_name | Value |+---------------+-------+| binlog_format | ROW   |+---------------+-------+1 row in set (0.00 sec)

解析开始:
com.alibaba.otter.canal.parse.inbound.AbstractEventParser#start

六 Canal集群高可用

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定)。
在这里插入图片描述
大致步骤:

  1. canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  2. 创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
  3. 一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
  4. canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/526382.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

少儿编程 蓝桥杯青少组科技素养题 信息素养真题及解析第25套

少儿编程 科技素养 信息素养真题第25套 1、旅行结束之后&#xff0c;回到家的小蓝决定将照片备份在云端的网盘上。备份照片主要占用的是小蓝家的( )带宽 A、下行 B、上行 C、文件 D、数据 答案&#xff1a;B 考点分析&#xff1a;主要考查网络相关知识&#xff0c;要将照…

公众号IP白名单已添加服务器IP 122.88... 依然给出 40164 错误

公众号的IP白名单已添加 122.88... 依然给出 40164 错误。 {"errcode":40164,"errmsg":"invalid ip 122.88... ipv6 ::ffff:122.88..., not in whitelist rid: 65e85a07-458dfc0d-16003e03"} 解决方案&#xff1a; 一、检查 AppID 是否正确&…

【数学】【组合数学】1830. 使字符串有序的最少操作次数

作者推荐 视频算法专题 本博文涉及知识点 数学 组合数学 LeetCode1830. 使字符串有序的最少操作次数 给你一个字符串 s &#xff08;下标从 0 开始&#xff09;。你需要对 s 执行以下操作直到它变为一个有序字符串&#xff1a; 找到 最大下标 i &#xff0c;使得 1 < i…

Javaweb之Maven高级之继承的详细解析

2.1 继承 我们可以再创建一个父工程 tlias-parent &#xff0c;然后让上述的三个模块 tlias-pojo、tlias-utils、tlias-web-management 都来继承这个父工程 。 然后再将各个模块中都共有的依赖&#xff0c;都提取到父工程 tlias-parent中进行配置&#xff0c;只要子工程继承了…

C++单例模式、工厂模式

一、单例模式 (一) 什么是单例模式 1. 是什么&#xff1f; 在系统的整个生命周期内&#xff0c;一个类只允许存在一个实例。 2. 为什么&#xff1f; 两个原因&#xff1a; 节省资源。方便控制&#xff0c;在操作公共资源的场景时&#xff0c;避免了多个对象引起的复杂操作…

云原生之容器编排实践-ruoyi-cloud项目部署到K8S:Nginx1.25.3

背景 前面搭建好了 Kubernetes 集群与私有镜像仓库&#xff0c;终于要进入服务编排的实践环节了。本系列拿 ruoyi-cloud 项目进行练手&#xff0c;按照 MySQL &#xff0c; Nacos &#xff0c; Redis &#xff0c; Nginx &#xff0c; Gateway &#xff0c; Auth &#xff0c;…

异步编程实战:使用C#实现FTP文件下载及超时控制

博客标题: 异步编程实战&#xff1a;使用C#实现FTP文件下载及超时控制 如果你的函数不是async&#xff0c;你仍然可以实现相同的超时功能&#xff0c;但你将不得不依赖更多的同步代码或使用.Result或.GetAwaiter().GetResult()来阻塞等待任务完成&#xff0c;这可能导致死锁的风…

mybatis-plus整合spring boot极速入门

使用mybatis-plus整合spring boot&#xff0c;接下来我来操作一番。 一&#xff0c;创建spring boot工程 勾选下面的选项 紧接着&#xff0c;还有springboot和依赖我们需要选。 这样我们就创建好了我们的spring boot&#xff0c;项目。 简化目录结构&#xff1a; 我们发现&a…

uniapp 解决请求出现 /sockjs-node/info?t=问题

1. uniapp请求出现 /sockjs-node/info?t问题 1.1. 问题 uniapp项目老是出现 http://192.168.2.106:8080/sockjs-node/info?t1709704280949 1.1. sockjs-node介绍 sockjs-node 是一个JavaScript库&#xff0c;提供跨浏览器JavaScript的API&#xff0c;创建了一个低延迟、全…

JAVA实战开源项目:快递管理系统(Vue+SpringBoot)

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 数据中心模块2.2 快递类型模块2.3 快递区域模块2.4 快递货架模块2.5 快递档案模块 三、界面展示3.1 登录注册3.2 快递类型3.3 快递区域3.4 快递货架3.5 快递档案3.6 系统基础模块 四、免责说明 一、摘要 1.1 项目介绍 …

Pulsar 社区周报 | No.2024.03.08 Pulsar-Spark Connector 助力实时计算

关于 Apache Pulsar Apache Pulsar 是 Apache 软件基金会顶级项目&#xff0c;是下一代云原生分布式消息流平台&#xff0c;集消息、存储、轻量化函数式计算为一体&#xff0c;采用计算与存储分离架构设计&#xff0c;支持多租户、持久化存储、多机房跨区域数据复制&#xff0c…

201909 青少年软件编程(Scratch)等级考试试卷(一级)

第1题&#xff1a;【 单选题】 小明在做一个采访的小动画&#xff0c;想让主持人角色说“大家好&#xff01;”3秒钟&#xff0c;用下列程序中的哪一个可以实现呢&#xff1f;&#xff08; &#xff09; A: B: C: D: 【正确答案】: B 【试题解析】 : 第2题&#xff1a…