SpringBoot结合Canal 实现数据同步

1、Canal介绍

     Canal 指的是阿里巴巴开源的数据同步工具,用于数据库的实时增量数据订阅和消费。它可以针对 MySQL、MariaDB、Percona、阿里云RDS、Gtid模式下的异构数据同步等情况进行实时增量数据同步。

    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x,Canal通过伪装成mysql从服务向主服务拉取数据。所以首先我们要了解mysql 主从数据同步。

2、Mysql 主从数据同步

1、从库(slave)会生成两个线程,I/O线程(IOthread),SQL线程(SQLthread)。

2、当slave的I/O线程连接到master后,会去请求master的二进制日志(binlog), 此时master会通过logdump(将主库的二进制日志文件内容传输给从库的过程) 给从库传输binlog。

3、 然后slave将拿到的binlog日志依次写入Relaylog(中继日志)的最末端,同时将读取到的Master 的bin-log的文件名和位置记录到master- info文件中,作用为了让slave知道它需要从哪个位置和哪 个日志文件开始同步数据,以保证数据的一致性,并且能够及时获取到master的新的更新操作, 开始数据同步过程。slave不仅在启动时读取 master-info 文件,而且会定期更新该文件中的记 录,以确保记录都是最新的。

4、最后SQL线程会读取Relaylog,并解析为具体操作(比如DDL这种),来实现主从库的操作一致。

3、Canal 原理

1、Canal Server与MySQL建立连接后,会通过模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议获取数据库的 binlog(二进制日志)文件。

2、Canal Server解析binlog文件,通过网络将解析后的事件传输给消息中间件(Kafka,RabbitMQ、Rocket等),实现数据的实时同步。

4、实现过程

       下载Canal,地址为https://github.com/alibaba/canal/releases,然后解压

  修改Canal的配置文件(conf/canal.properties)

指定模式canal.serverMode = rabbitMQ# 指定实例,多个实例使用逗号分隔: canal.destinations = example1,example2canal.destinations = example# rabbitmq 服务端 iprabbitmq.host = 你的ip(注意不要加端口号哦)# rabbitmq 虚拟主机rabbitmq.virtual.host = /# rabbitmq 交换机rabbitmq.exchange = canal.exchange  (这是本例子用的交换机)# rabbitmq 用户名rabbitmq.username = 你的用户名# rabbitmq 密码rabbitmq.password = 你的密码rabbitmq.deliveryMode =

修改实例配置文件 conf/example/instance.properties。

#配置 slaveId,自定义,不等于 mysql 的 server Id 即可canal.instance.mysql.slaveId=10# 数据库地址:配置自己的ip和端口canal.instance.master.address=你的IP:端口号# 数据库用户名和密码canal.instance.dbUsername=用户名canal.instance.dbPassword=密码# 指定库和表canal.instance.filter.regex=.*\..*    # 这里的 .* 表示 canal.instance.master.address 下面的所有数据库# mq config# rabbitmq 的 routing keycanal.mq.topic=canal.routing.key(这是本例子用的key)

然后重启 canal 服务。

2 、搭建RabbitMq并且 配置Exchange、Routing key。

3、Spring  Boot 实现代码

   引入依赖

<!--引入rabbitmq集成的依赖-->       <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

编辑配置文件

spring:#rabbitmqrabbitmq:#    host: 192.168.31.189host: 192.168.1.12port: 5672virtual-host: /username: adminpassword: adminlistener:simple:         #手动确认消息 ackacknowledge-mode: manualretry:enabled: truemax-attempts: 3max-interval: 1000ms
server:port: 8888

   定义队列的消息接受的数据对象

@Data
public class CanalMessage<T> {private String type;private String table;private List<T> data;private String database;private Long es;private Integer id;private Boolean isDdl;private List<T> old;private List<String> pkNames;private String sql;private Long ts;
}

定义rabbitmq的配置,代码如下

@Configuration
@Slf4j
public class RabbitConfig {/*** 消息序列化配置*/@Beanpublic RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {// SimpleRabbitListenerContainerFactory 是 RabbitMQ 提供的一个实现了 RabbitListenerContainerFactory 接口的简单消息监听器容器工厂。// 它的作用是创建和配置 RabbitMQ 消息监听器容器,用于监听和处理消息。SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();//ConnectionFactory 是 RabbitMQ 提供的一个接口,用于创建 RabbitMQ 的连接factory.setConnectionFactory(connectionFactory);//使用了 Jackson2JsonMessageConverter 将消息转换为 JSON 格式进行序列化和反序列化factory.setMessageConverter(  new Jackson2JsonMessageConverter());return factory;}
}

消费的代码

@Component
@RabbitListener(queues = "data")
public class MqNotifyReceive {@RabbitHandlerpublic void receive(CanalMessage CanalMessage) {//进行同步业务处理。。。 }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/681859.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

揿针贴一般贴多长时间?

点击文末领取揿针的视频教程跟直播讲解 揿针作为一种侵入性疗法&#xff0c;一般贴48-72小时&#xff0c;相对来说比较安全&#xff0c;取针后24小时内都不建议碰水&#xff0c;以免局部发炎。 一般揿针都贴到背部以及需要治疗的区域&#xff0c;揿针进针的深度非常浅&#x…

每日Attention学习5——Multi-Scale Channel Attention Module

模块出处 [link] [code] [WACV 21] Attentional Feature Fusion 模块名称 Multi-Scale Channel Attention Module (MS-CAM) 模块作用 通道注意力 模块结构 模块代码 import torch import torch.nn as nnclass MS_CAM(nn.Module):def __init__(self, channels64, r4):super(…

物联网网关制造生产全流程揭秘!

如果您正有开发和定制物联网网关的计划&#xff0c;找一个专业的物联网设备厂商协助您制造生产物联网网关可以节省大量时间和成本&#xff0c;可以让您能专注于当前核心业务&#xff0c;而无需将精力过多地投入到自己不擅长的领域。 当然&#xff0c;了解物联网网关的测试和制…

Android 高版本实现沉浸式状态栏

目前实现的android高版本沉浸式状态栏分为两类&#xff1a; 1、是纯透明状态栏&#xff1b; 2、是纯透明状态栏&#xff0c;但是状态栏字体是黑色&#xff1b; 将状态栏的代码封装到BaseActivity中更方便使用&#xff1a; BaseActivity: public abstract class BaseActivit…

重学java 33.API 4.日期相关类

任何事&#xff0c;必作于细&#xff0c;也必成于实 —— 24.5.9 一、Date日期类 1.Date类的介绍 1.概述: 表示特定的瞬间,精确到亳秒 2.常识: a.1000毫秒 1秒 b.时间原点:1970年1月1日 0时0分0秒(UNIX系统起始时间),叫做格林威治时间,在0时区上 c.时区:北京位于东八区,一个时区…

一个递推通项公式研究

递推关系为a(n) ​pa(n−1) ​ qa(n−2) ​&#xff0c;本项前一项*2前前项&#xff0c;具体如 1&#xff0c;1&#xff0c;3&#xff0c;7&#xff0c;17&#xff0c;41&#xff0c;99&#xff0c;239&#xff0c;…… 一般的递推关系可以用以下方法 得两个解&#xff1a; …

WPF控件之StackPanel布局控件

StackPanel别名堆栈panel 使其子元素按照一定方式进行布局&#xff0c;子元素排布方式要么设置为水平排布&#xff0c;要么垂直排布。 属性 Orientation设置排列方式(默认的是垂直排布) : Horizontal水平排布 Vertical 垂直排布 实例 <StackPanel Orientation"Vert…

【Python】什么是皮尔森系数

我不完美的梦 你陪着我想 不完美的勇气 你说更勇敢 不完美的泪 你笑着擦干 不完美的歌 你都会唱 我不完美心事 你全放在心上 这不完美的我 你总当做宝贝 你给我的爱也许不完美 但却最美 &#x1f3b5; 周冬雨《不完美女孩》 皮尔森相关系数&#xff08;Pe…

力扣HOT100 - 4. 寻找两个正序数组的中位数

解题思路&#xff1a; 两个数组合并&#xff0c;然后根据奇偶返回中位数。 class Solution {public double findMedianSortedArrays(int[] nums1, int[] nums2) {int m nums1.length;int n nums2.length;int[] nums new int[m n];if (m 0) {if (n % 2 0) return (nums2…

自动控制原理学习--平衡小车的控制算法(三)

上一节PID的simulin仿真&#xff0c;这一节用LQR 一、模型 二、LQR LQR属于现代控制理论的一个很重要的点&#xff0c;这里推荐B站的【Advanced控制理论】课程&#xff08;up主DR_CAN&#xff09;&#xff0c;讲得很好&#xff0c;这里引用了他视频里讲LQR的ppt。 LQR属于lo…

k8s概述及核心组件

一、k8s概述 1.1 引言 docker compose 单机编排工具 有企业在用 docker swarm 能够在多台主机中构建一个docker集群 基本淘汰集群化管理处理工具 容器 微服务封装 dockerfile 编写成镜像 然后进行发布 dockerfile 可以写成shell脚本&#xff08;函数做调…

手机视频提取gif怎么操作?分享这个方法不能错过!

随着网络的发展动态gif表情包已经是人们交流的重要部分了。想要通过手机来实现视频转换gif的操作&#xff0c;还不想下载软件的情况下。可以通过使用手机端的视频转gif工具-GIF中文网&#xff0c;无需下载软件。手机端轻松一键就能在线实现视频提取gif的操作。一起来看看具体的…