canal: 连接kafka (docker)

一、确保mysql binlog开启并使用ROW作为日志格式
docker 启动mysql 5.7配置文件
my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server-id=1

在这里插入图片描述

在这里插入图片描述
一定要确保上述两个值一个为ROW,一个为ON

二、下载canal的run.sh

https://github.com/alibaba/canal/blob/master/docker/run.sh

三、启动canal容器,其中配置了mysql的信息和kafka的信息:

./run.sh -e canal.auto.scan=false -e canal.destinations=me -e canal.instance.master.address=xx.xx.xx.xx:3306  -e canal.instance.dbUsername=root  -e canal.instance.dbPassword=yourpassword  -e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false -e canal.instance.topic=me -e canal.mq.partition=0  -e canal.mq.topic=me

四、通过docker exec -it canal-server /bin/bash进入canal容器,编辑容器中的

/home/admin/canal-server/conf/canal.properties

其中要修改两处,一是工作模式改为kafka(canal.serverMode),二是改MQ的地址(canal.mq.servers),完整配置类似如下:

#################################################
######### 		common argument		############# 
#################################################
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mq
canal.aliyun.accesskey =
canal.aliyun.secretkey =#################################################
######### 		destinations		############# 
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
#canal.instance.global.manager.address = 127.0.0.1:1099
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### 		     MQ 		     #############
##################################################
canal.mq.servers = xx.xx.xx.xx:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all

五、重启canal容器
docker restart canal-server

这样,对于上面的mysql 数据库中的增删改,在kakfa都可以在me 这个topic 收到数据了。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/578828.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙 UIAbility和Compent 生命周期

一、UIAbility的生命周期 在UIAbility的使用过程中,会有多种生命周期状态,掌握UIAbility的生命周期,对于应用的开发非常重要。 1、UIAbility的生命周期 UIAbility的生命周期主要分为以下4个: Create---Foreground---Background---…

拷贝对象时的一些编译器优化

在传参和传返回值的过程中&#xff0c;一般编译器会做一些优化&#xff0c;减少对象的拷贝&#xff0c;这个在一些场景下还 是非常有用的。 class A { public:A(int a 0):_a(a){cout << "A(int a)" << endl;}A(const A& aa):_a(aa._a){cout <<…

FL Studio21.2.3中文版音乐制作编曲软件功能展示讲解

FL Studio 21&#xff0c;确实被广大音乐制作人亲切地称为“水果”。这款软件以其强大的功能和用户友好的界面在音乐制作领域占据了重要地位。 FL Studio 21&#xff08;水果&#xff09;是一款全能的音乐创作软件&#xff0c;也是一款强大的编曲软件&#xff0c;可以作为编曲…

Hyper-V 虚拟机设置静态 IP 和外网访问

文章目录 环境说明1 问题简介2 解决过程 环境说明 宿主机操作系统&#xff1a;Windows 11 专业版漏洞复现操作系&#xff1a;debian-live-12.5.0-amd64-standard 1 问题简介 在 Windows 上用自带的 Hyper-V 虚拟机管理应用创建了一个 Debian 12 虚拟机&#xff0c;配置静态 IP…

Java基础之自增自减运算符

Java基础之自增自减运算符 基本用法 int a 10;a; System .out.prinln(a);//a11int a 10;a; System .out.prinln(a);//a11tip: 第一次运算之后a的值会更新 然后再进行下面的运算!!! 练习: 代码呈现: 结果: tip: x的值是最新的x的值.

windows下QT如何集成OpenCV

说明 我在windows下使用QT Creator12创建的CMake项目&#xff0c;需要OpenCV的一些功能。由于安装的时候我选择的QT组件都是MInGW的&#xff0c;所以无法使用VS studio版本的dll库。 为什么vs的版本不能用 我安装QT选择的是MinGW版本&#xff0c;本地编译QT工程只能选择MinG…

揭秘谷歌Gemini Pro 1.5:如何免费体验处理超长对话的AI模型?

最近Google发布大模型API&#xff0c;让人有点想哭的那种。 他们发布了Gemini Pro&#xff0c;这个东西的免费key每分钟能调用60次&#xff01; 想想看&#xff0c;这速度&#xff0c;比GPT-3.5以前的免费key快了30倍不止。 而且&#xff0c;更厉害的是&#xff0c;即使是Ge…

新能源充电桩站场视频汇聚系统建设方案及技术特点分析

随着新能源汽车的普及&#xff0c;充电桩作为新能源汽车的基础设施&#xff0c;其安全性和可靠性越来越受到人们的关注。为了更好地保障充电桩的安全运行与站场管理&#xff0c;TSINGSEE青犀&触角云推出了一套新能源汽车充电桩视频汇聚管理与视频监控方案。 方案采用高清摄…

NO12 蓝桥杯单片机之DS1302的使用

1 DS1302是什么 DS1302由两块存储器组成&#xff0c;一个是日历时钟寄存器还有一个是31位的静态RAM存储器。 而在蓝桥杯中常考的就是日历时钟寄存器&#xff0c;故这里只介绍日历时钟寄存器。简单来说&#xff0c;其就是一个“电子表”&#xff0c;他会自动的实时记录时间&am…

【服务端】node.js详细的配置

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

Centos JDK1.8 下载安装

https://www.oracle.com/java/technologies/javase/javase8u211-later-archive-downloads.html 一 RPM包安装 rpm -ivh jdk-8u391-linux-x64.rpm /etc/profile export JAVA_HOME/usr/java/jdk1.8.0-x64 export PATH$JAVA_HOME/bin:$PATHsource /etc/profile二 tar.gz 包手动…

k8s下搭建redis集群

记录一下近期实现的在k8s上搭建redis集群的过程 1、新建存储类 主要是为了和其它服务的存储类区分一下 redis-beta-storage 2、编写configMap redis启动时从configMap中读取配置 bind&#xff1a;默认的127.0.0.1可能会导致其它ip地址无法远程访问&#xff0c;因此修改为0.0…