canal 嵌入式部署 监听binlog

canal 嵌入式部署

  • 背景
    • 技术选型
    • canal
    • 原理
    • 用途
    • 嵌入式代码实现
      • 引入pom
      • 引入工具pom
      • main方法
      • 引入
      • 常量定义
      • install方法
      • buildCanal方法
      • pull方法
      • printSummary
      • printEntry2
    • 总结
    • 谢谢

背景

最近发现一个需求,需要监听mysql 数据库里的数据变动, 但由于架构方面的原因, 只能做成单体嵌入式的方向,嵌入进应用中,不用单独部署

技术选型

我对监控binlog 监控工具进行了了解,包括

  1. mysql-binlog-connector-java1
  2. canal2
  3. open-replicator3

canal

本篇博文主讲cannal 的嵌入模式

原理

在这里插入图片描述

用途

要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

嵌入式代码实现

引入pom

         <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.common</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.deployer</artifactId><version>1.1.4</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.server</artifactId><version>1.1.4</version></dependency>

引入工具pom

        <dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency>

main方法

public static void main(String[] args) {EmbeddedCanalListener embeddedCanalListener = new EmbeddedCanalListener();//安装实体embeddedCanalListener.install();//拉取消息embeddedCanalListener.pull();}

引入

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.instance.manager.CanalInstanceWithManager;
import com.alibaba.otter.canal.instance.manager.model.Canal;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.IndexMode;
import com.alibaba.otter.canal.instance.manager.model.CanalParameter.SourcingType;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

常量定义

//日志public static final Logger logger = LoggerFactory.getLogger(EmbeddedCanalListener.class);//随意命名protected static final String DESTINATION = "example";//测试sqlprotected static final String DETECTING_SQL = "select 1 from dual;";//MSQL配置protected static final String MYSQL_ADDRESS = "xxxx";protected static final int MYSQL_PORT = xxx;protected static final String USERNAME = "xxx";protected static final String PASSWORD = "xxx";//使用ORACLE 未实现protected static final String ORACLE_ADDRESS = "xx.xx.xx.xx";protected static final int ORACLE_PORT = xxx;protected static final String ORACLE_USERNAME = "xxx";protected static final String ORACLE_PASSWORD = "xxx";/*** 表筛选 , 这里默认全部* 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)* <p>* <p>* 常见例子:* <p>* 1.  所有表:.*   or  .*\\..* 2.  canal schema下所有表: canal\\..* 3.  canal下的以canal打头的表:canal\\.canal.* 4.  canal schema下的一张表:canal\\.test1* <p>* 5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)*/protected static final String FILTER = ".*";//定义一个serverprivate CanalServerWithEmbedded server;//定义一个clientprivate ClientIdentity clientIdentity = new ClientIdentity(DESTINATION, (short) 1);static {context_format = SEP + "****************************************************" + SEP;context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;context_format += "* Start : [{}] " + SEP;context_format += "* End : [{}] " + SEP;context_format += "****************************************************" + SEP;row_format = SEP+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"+ SEP;transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"+ SEP;}

install方法

        //获取一个instanceserver = CanalServerWithEmbedded.instance();//设置一个gennertor去生成server.setCanalInstanceGenerator(destination -> {//构建cannal 把上面的参数设置进去Canal canal = buildCanal();//返回一个Managerreturn new CanalInstanceWithManager(canal, FILTER);});//启动server.start();//启动这个实例server.start(DESTINATION);

buildCanal方法

		Canal canal = new Canal();//ID无意义 随便设置canal.setId(12L);canal.setName(DESTINATION);canal.setDesc("my standalone server test ");CanalParameter parameter = new CanalParameter();//parameter.setDataDir("./conf");//索引的模式, 嵌入式选择内存parameter.setIndexMode(IndexMode.MEMORY);//存储buffsize 具体看canal 官方的介绍parameter.setMemoryStorageBufferSize(32 * 1024);//设置Mysql的配置 包括模式,地址,默认scheme,用户名,密码,slaveId(查看mysql的My.conf),链接编码格式,缓存设置(看官方介绍)parameter.setSourcingType(SourcingType.MYSQL);parameter.setDbAddresses(Arrays.asList(new InetSocketAddress(MYSQL_ADDRESS, MYSQL_PORT)));parameter.setDefaultDatabaseName("XXXX");parameter.setDbUsername(MYSQL_USERNAME);parameter.setDbPassword(MYSQL_PASSWORD);//可以指定binlog 和起始位置 或者其实timestamp
//        parameter.setPositions(Arrays.asList("{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}",
//                "{\"journalName\":\"mysql-bin.000001\",\"position\":332L,\"timestamp\":\"1505998863000\"}"));parameter.setSlaveId(2L);parameter.setDefaultConnectionTimeoutInSeconds(30);parameter.setConnectionCharset("GBK");parameter.setConnectionCharsetNumber((byte) 33);parameter.setReceiveBufferSize(8 * 1024);parameter.setSendBufferSize(8 * 1024);//测试链接设置 ,这里是false 无意义parameter.setDetectingEnable(false);parameter.setDetectingIntervalInSeconds(10);parameter.setDetectingRetryTimes(3);parameter.setDetectingSQL(DETECTING_SQL);parameter.setGtidEnable(true);canal.setCanalParameter(parameter);return canal;

pull方法

		//定义拉取的大小int batchSize = 5 * 1024;while (running) {try {//订阅当前设定的clientserver.subscribe(clientIdentity);//循环拉取while (running) {Message message = server.getWithoutAck(clientIdentity, batchSize);List<CanalEntry.Entry> entries;//message如果是raw形式的需要去rawEntries去解析if (message.isRaw()) {List<ByteString> rawEntries = message.getRawEntries();entries = new ArrayList<>(rawEntries.size());for (ByteString byteString : rawEntries) {CanalEntry.Entry entry;try {entry = CanalEntry.Entry.parseFrom(byteString);} catch (InvalidProtocolBufferException e) {throw new RuntimeException(e);}entries.add(entry);}} else {//如果不是就直接拉取entries = message.getEntries();}long batchId = message.getId();int size = entries.size();//如果是batchId是负一或者无内容进行睡眠if (batchId == -1 || size == 0) {try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {//打印汇总信息printSummary(message, batchId, size);//打印实体信息printEntry2(entries);server.ack(clientIdentity, batchId); // 提交确认}}} finally {//取消订阅//server.unsubscribe(clientIdentity);}}

printSummary

//打印解读时间
//关注log的起始位置终止位置和时间延迟的可以关注这个类 如何取protected void printSummary(Message message, long batchId, int size) {long memsize = 0;for (Entry entry : message.getEntries()) {memsize += entry.getHeader().getEventLength();}String startPosition = null;String endPosition = null;if (!CollectionUtils.isEmpty(message.getEntries())) {startPosition = buildPositionForDump(message.getEntries().get(0));endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));}SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);logger.info(context_format,new Object[]{batchId, size, memsize, format.format(new Date()), startPosition, endPosition});}protected String buildPositionForDump(Entry entry) {long time = entry.getHeader().getExecuteTime();Date date = new Date(time);SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"+ entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {position += " gtid(" + entry.getHeader().getGtid() + ")";}return position;}

printEntry2

//打印实体private void printEntry2(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) 		      {//如果需要监控事务的可以在这里进行实现continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}//关注具体内容可以在这里实现EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}//打印具体内容
protected void printColumn(List<Column> columns) {for (Column column : columns) {//如果column 是更新了的字段才打印if (column.getUpdated()) {StringBuilder builder = new StringBuilder();try {if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {// get value bytesbuilder.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));} else {builder.append(column.getName() + " : " + column.getValue());}} catch (UnsupportedEncodingException e) {}builder.append("    type=" + column.getMysqlType());if (column.getUpdated()) {builder.append("    update=" + column.getUpdated());}builder.append(SEP);logger.info(builder.toString());}}}

总结

到这里 代码基本完成了, 然后根据自己的业务实现就好了
具体可以参考 canal java 客户端 的官方实现
还有他们的 AdminGuide 里面有详细的案例
推荐一份源码解析

谢谢


  1. mysql-binlog-connector-java ↩︎

  2. canal ↩︎

  3. open-replicator ↩︎

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/59743.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用 OLE 对象漏洞的 HWP 恶意文件浮出水面

ASEC 分析人员发现了一个利用 OLE 对象的恶意 HWP 文件&#xff0c;尽管其使用了 2020 年就被识别的恶意 URL&#xff0c;但仍然使用了 Flash 漏洞&#xff08;CVE-2018-15982&#xff09;&#xff0c;需要用户谨慎对待。 打开 HWP 文件时会在 %TEMP%文件夹中生成如下文件。攻…

数据库活动监控(DAM)

在当今数据驱动的世界中&#xff0c;组织在保护存储在数据库中的机密数据并确保其完整性方面面临着越来越多的挑战。数据库审计通过提供全面的数据库活动监控方法&#xff0c;在应对这些挑战方面发挥着至关重要的作用。 数据库活动监控&#xff08;Database Activity Monitori…

组合模式(C++)

定义 将对象组合成树形结构以表示部分-整体’的层次结构。Composite使得用户对单个对象和组合对象的使用具有一致性(稳定)。 应用场景 在软件在某些情况下&#xff0c;客户代码过多地依赖于对象容器复杂的内部实现结构&#xff0c;对象容器内部实现结构(而非抽象接口)的变化…

Centos7离线安装MySQL8

1、下载MySQL https://downloads.mysql.com/archives/community/ 2、下载完毕后&#xff0c;上传到Centos&#xff0c;解压 tar -xf mysql-8.0.33-1.el7.x86_64.rpm-bundle.tar 3、逐条执行安装命令 rpm -ivh mysql-community-common-8.0.33-1.el7.x86_64.rpm rpm -ivh …

HCIP实验

实验题目如下&#xff1a; 实验拓扑如下&#xff1a; 实验要求如下&#xff1a; 【1】两个协议间进行多点双向重发布 【2】R7的环回没有宣告在OSPF协议中&#xff0c;而是后期重发布进入的 【3】解决环路&#xff0c;所有路径选择最优&#xff0c;且存在备份 实验思路如下&…

langchain-ChatGLM源码阅读:参数设置

文章目录 上下文关联对话轮数向量匹配 top k控制生成质量的参数参数设置心得 上下文关联 上下文关联相关参数&#xff1a; 知识相关度阈值score_threshold内容条数k是否启用上下文关联chunk_conent上下文最大长度chunk_size 其主要作用是在所在文档中扩展与当前query相似度较高…

基于Doris实时数据开发的一些注意事项

300万字&#xff01;全网最全大数据学习面试社区等你来&#xff01; 最近Doris的发展大家是有目共睹的。例如冷热分离等新特性的持续增加。使得Doris在易用和成本上都有大幅提升。 基于Doris的一些存储实时数仓在越来越多的场景中开始有一些实践。大家也看到了这种方案频繁出现…

设备固定资产管理系统

资产管理是企业经营和发展的基础&#xff0c;特别是设备资产管理。适当的设备资产管理可以有效地提升企业的经营效率&#xff0c;为提高核心竞争能力提供高效的前提。 固资及设备管理系统&#xff08;EAM&#xff09;它是一种有效的固定资产管理模式&#xff0c;可以帮助企业更…

四 、Mysql 开发

四 、Mysql开发 102 可以使用MySQL直接存储文件吗&#xff1f; 可以使用 BLOB (binary large object)&#xff0c;用来存储二进制大对象的字段类型。 TinyBlob 255 值的长度加上用于记录长度的1个字节(8位) Blob 65K值的长度加上用于记录长度的2个字节(16位) MediumBlob 16M值…

【iOS安全】开启任意app的WebView远程调试

参考&#xff1a;https://mp.weixin.qq.com/s/bNKxQaVrPaXsZ5BPbsXy7w &#xff08;来自周智老师的公众号&#xff09; 概述 Safari 有一个内置的前端调试器&#xff0c; 在iPhone通过局域网或者USB连接MacBook 并启用Safari 远程调试之后&#xff0c;前端调试器默认情况下对…

Visual Studio Code中对打开的脚本格式统一

什么是Language Server Protocol (LSP)? Language Server Protocol&#xff08;语言服务器协议&#xff0c;简称LSP&#xff09;是微软在2016年提出的一套统一的通讯协议方案。LSP定义了一套编辑器或者IDE与语言服务器&#xff08;Language Server&#xff09;之间使用的协议&…

八大排序

目录 选择排序-直接插入排序 插入排序-希尔排序 选择排序-简单选择排序 选择排序-堆排序 交换排序-冒泡排序 交换排序-快速排序 归并排序 基数排序 选择排序-直接插入排序 基本思想: 如果碰见一个和插入元素相等的&#xff0c;那么插入元素把想插入的元素放在相等元素…