canal 数据异构组件
为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据
canal 组件的使用
1.下载canal组件
下载地址canal组件下载地址
在我的资源中也有canal组件包
解压启动(我是windows版,双击startup.bat)
2.数据库配置
1.开启MySQL , 需要先开启 Binlog 写入功能
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2.授权 canal 作为mysql 的slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
5.简单例子使用测试
1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)
6.进一步完善canal监听数据工具类,用于应用例子
1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据
package com.next.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;/*** @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印*/
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainNumberService trainNumberService;//监听,启动的时候就开始调用此监听方法@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {canalSubscribe();}private void canalSubscribe() {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;//使用线程new Thread(() -> {try {log.info("canal subscribe");connector.connect();connector.subscribe(".*\\..*");connector.rollback();while (true) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {//没有取到数据继续safeSleep(100);continue;}try {log.info("new message,batchIds:{},size:{}", batchId, batchSize);//打印日志printEntry(message.getEntries());// 提交确认connector.ack(batchId);} catch (Exception e2) {log.error("canal data exception,batchIds:{}", batchId, e2);// 处理失败, 回滚数据connector.rollback(batchId);}}} catch (Exception e3) {log.error("canal subscribe exception", e3);safeSleep(1000);canalSubscribe();}}).start();}private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);}//更新类型-更新,删除,新增CanalEntry.EventType eventType = rowChage.getEventType();//数据库名String schemaName = entry.getHeader().getSchemaName();//表名String tableName = entry.getHeader().getTableName();log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);} else {handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);}}}}//处理canal监测到的数据private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{if(schemaName.contains("12306_seat_")){//处理座位变更trainSeatService.handle(columnsList,eventType);}else if(tableName.equals("train_number")){//车次详情处理(实际上是车次信息变更之后才批量处理车次详情)trainNumberService.handle(columnsList,eventType);}else{log.info("drop data,no need care");}}private void safeSleep(int millis) {try {Thread.sleep(100);} catch (Exception e1) {}}}
处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)
package com.next.service;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.util.List;@Service
@Slf4j
public class TrainSeatService {@Resourceprivate TrainNumberMapper trainNumberMapper;@Resourceprivate TrainCacheService trainCacheService;//处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {if (eventType != CanalEntry.EventType.UPDATE) {log.info("not update,no need care");return;}TrainSeat trainSeat = new TrainSeat();boolean isStatusUpdated = false;for (CanalEntry.Column column : columns) {//票的状态改变了才做下面的操作if (column.getName().equals("status")) {trainSeat.setStatus(Integer.parseInt(column.getValue()));if (column.getUpdated()) {isStatusUpdated = true;} else {break;}} else if (column.getName().equals("id")) {trainSeat.setId(Long.parseLong(column.getValue()));} else if (column.getName().equals("carriage_number")) {trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("row_number")) {trainSeat.setRowNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("seat_number")) {trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));} else if (column.getName().equals("train_number_id")) {trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("ticket")) {trainSeat.setTicket(column.getValue());} else if (column.getName().equals("from_station_id")) {trainSeat.setFromStationId(Integer.parseInt(column.getValue()));} else if (column.getName().equals("to_station_id")) {trainSeat.setToStationId(Integer.parseInt(column.getValue()));}}if (!isStatusUpdated) {log.info("status not update,no need care");}log.info("train seat update,trainSeat:{}", trainSeat);/*** 数据存到redis* 1.指定座位被占:hash* cacheKey:车次_日期 D386_20231001* field: carriage_row_seat_fromStationId_toStationId* value: 0-空闲 1-占座** 2.每个座位详情剩余的座位数* cacheKey: 车次_日期_count D386_20231001_count* field: fromStationId_toStationId* value: 实际座位数**/TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());//放票if (trainSeat.getStatus() == 1) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"0");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),1l);log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);//占票} else if (trainSeat.getStatus() == 2) {trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()+ "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),"1");trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),-1l);log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);} else {log.info("status update not 1 or 2,no need care");}}}
参考文档:canal使用说明文档