canal 数据同步组件

canal 数据异构组件

为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据

canal 组件的使用

1.下载canal组件

下载地址canal组件下载地址
在我的资源中也有canal组件包
在这里插入图片描述
解压启动(我是windows版,双击startup.bat)

在这里插入图片描述

2.数据库配置

1.开启MySQL , 需要先开启 Binlog 写入功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.授权 canal 作为mysql 的slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}
5.简单例子使用测试

1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.进一步完善canal监听数据工具类,用于应用例子

1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据

package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印*/
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainNumberService trainNumberService;//监听,启动的时候就开始调用此监听方法@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;//使用线程new Thread(() -> {try {log.info("canal subscribe");connector.connect();connector.subscribe(".*\\..*");connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info("new message,batchIds:{},size:{}", batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error("canal data exception,batchIds:{}", batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error("canal subscribe exception", e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);}//更新类型-更新,删除,新增CanalEntry.EventType eventType = rowChage.getEventType();//数据库名String schemaName = entry.getHeader().getSchemaName();//表名String tableName = entry.getHeader().getTableName();log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains("12306_seat_")){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals("train_number")){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info("drop data,no need care");}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}}

处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)

package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;@Service
@Slf4j
public class TrainSeatService {@Resourceprivate TrainNumberMapper trainNumberMapper;@Resourceprivate TrainCacheService trainCacheService;//处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {if (eventType != CanalEntry.EventType.UPDATE) {log.info("not update,no need care");return;}TrainSeat trainSeat = new TrainSeat();boolean isStatusUpdated = false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals("status")) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated = true;} else {break;}} else if (column.getName().equals("id")) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals("carriage_number")) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("row_number")) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("seat_number")) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("train_number_id")) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("ticket")) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals("from_station_id")) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("to_station_id")) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info("status not update,no need care");}log.info("train seat update,trainSeat:{}", trainSeat);/*** 数据存到redis* 1.指定座位被占:hash* cacheKey:车次_日期  D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() == 1) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"0");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),1l);log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() == 2) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"1");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),-1l);log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);} else {log.info("status update not 1 or 2,no need care");}}}

在这里插入图片描述

参考文档:canal使用说明文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/305551.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习核心技术与实践之深度学习基础篇

非书中全部内容&#xff0c;只是写了些自认为有收获的部分 神经网络 生物神经元的特点 &#xff08;1&#xff09;人体各种神经元本身的构成很相似 &#xff08;2&#xff09;早期的大脑损伤&#xff0c;其功能可能是以其他部位的神经元来代替实现的 &#xff08;3&#x…

CSS 向上扩展动画

上干货 <template><!-- mouseenter"startAnimation" 表示在鼠标进入元素时触发 startAnimation 方法。mouseleave"stopAnimation" 表示在鼠标离开元素时触发 stopAnimation 方法。 --><!-- 容器元素 --><div class"container&q…

深入理解Mysql MHA高可用集群搭建:从实验到实战

1. 简介 MHA&#xff08;Master High Availability&#xff09;是一个高效的开源MySQL高可用性解决方案。由日本开发者yoshinorim&#xff08;前DeNA员工&#xff0c;现在Facebook&#xff09;创建&#xff0c;MHA支持MySQL的主从复制架构&#xff0c;自动化主节点故障转移。当…

使用pytorch搭建ResNeXt并基于迁移学习训练

冻结除最后全连接层以外的所有权重&#xff0c;只去单独训练它最后一层的的权重&#xff0c;这个方法&#xff0c;冻结了所有网络的权重。 for param in net.parameters():param.requires_grad False

如何利用VR全景做好品牌营销?

VR全景技术能够为用户创造全新而沉浸式的体验&#xff0c;现在已经成为了品牌营销领域的重要工具。越来越多的企业开始在互联网上对自己进行宣传推广&#xff0c;但是线上推广渠道也是有很多的&#xff0c;该选择哪一种渠道更适合企业呢&#xff1f;现阶段又如何利用VR全景做好…

TPRI-DMP平台介绍

TPRI-DMP平台介绍 TPRI-DMP平台概述 TPRI-DMP为华能集团西安热工院自主产权的工业云PaaS平台&#xff0c;已经过13年的发展和迭代&#xff0c;其具备大规模能源电力行业生产应用软件开发和运行能力。提供TPRI-DMP平台主数据管理、业务系统开发与运行、应用资源管理与运维监控…

线程基础知识(三)

前言 之前两篇文章介绍了线程的基本概念和锁的基本知识&#xff0c;本文主要是学习同步机制&#xff0c;包括使用synchronized关键字、ReentrantLock等&#xff0c;了解锁的种类&#xff0c;死锁、竞争条件等并发编程中常见的问题。 关键字synchronized synchronied关键字可…

「Kafka」生产者篇

「Kafka」生产者篇 生产者发送消息流程 在消息发送的过程中&#xff0c;涉及到了 两个线程 ——main 线程和Sender 线程。 在 main 线程中创建了 一个 双端队列 RecordAccumulator。 main线程将消息发送给RecordAccumulator&#xff0c;Sender线程不断从 RecordAccumulator…

独立容器 Rancher Server 证书过期解决

问题 Rancher无法登录 容器报错X509&#xff1a;certificate has expired or is not ye valid 在某天需要发布新版本的时候&#xff0c;发现rancher无法登录&#xff0c;于是到服务器上查看rancher日志&#xff0c;发现以下内容&#xff1a; docker logs -f rancher --since10…

基于遗传算法的航线规划

MATLAB2016b可以正常运行 基于遗传算法的无人机航线规划资源-CSDN文库

k8s二进制部署--部署高可用

连接上文 notready是因为没有网络&#xff0c;因此无法创建pod k8s的CNI网络插件模式 1.pod内部&#xff0c;容器与容器之间的通信。 在同一个pod中的容器共享资源和网络&#xff0c;使用同一个网络命名空间。 2.同一个node节点之内&#xff0c;不同pod之间的通信。 每个pod都…

SQL Server 索引和视图

CSDN 成就一亿技术人&#xff01; 难度指数&#xff1a;* * * CSDN 成就一亿技术人&#xff01; 目录 1.索引 什么是索引&#xff1f; 索引的作用&#xff1f; 索引的分类 1. 唯一索引 2. 主键索引 3. 聚集索引 4.非聚集索引 5.复合索引 6.全文搜索 索引的创建&am…