Java 监听Mysql binlog

使用 mysql-binlog-connector-java

1. mysql-binlog-connector-java 官网
2. Java代码中,如何监控Mysql的binlog?

前置条件

1. mysql服务器表结构
CREATE TABLE `student` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,`age` int NOT NULL,`code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,PRIMARY KEY (`id`),KEY `idx_name` (`name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
2. 开启master的mysql 服务器的log_bin
show variables like 'log_bin';

log-bin状态
如果没有,那么设置文件中增加配置

log_bin=mysql-bin
binlog-format=ROW
server-id=1

重启服务

  • 在配置文件中加入了log_bin配置项后,表示启用了binlog
  • binlog-format是binlog的日志格式,支持三种类型,分别是STATEMENT、ROW、MIXED,我们在这里使用ROW模式
  • server-id用于标识一个sql语句是从哪一个server写入的,这里一定要进行设置,否则我们在后面的代码中会无法正常监听到事件

引入maven依赖

<dependency><groupId>com.zendesk</groupId><artifactId>mysql-binlog-connector-java</artifactId><version>0.25.0</version>
</dependency>

代码块实现监听

查看一个简单的case

public static void main(String[] args) {// 这里的账号必须要有权限访问BinaryLogClient client = new BinaryLogClient("127.0.0.1", 3306, "root", "root");// 反序列化配置EventDeserializer eventDeserializer = new EventDeserializer();eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
//                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);// 设置反序列化配置client.setEventDeserializer(eventDeserializer);// 设置自己的client作为服务器的idclient.setServerId(3);// 可选,设置start fileName+position
//        client.setBinlogFilename("master-bin.000080");
//        client.setBinlogPosition(219);client.registerEventListener(event -> {EventData data = event.getData();String tableName;if (data instanceof TableMapEventData) {System.out.println("Table:");TableMapEventData tableMapEventData = (TableMapEventData) data;System.out.println(tableMapEventData.getTableId() + ": [" + tableMapEventData.getDatabase() + "." + tableMapEventData.getTable() + "]");tableName = tableMapEventData.getTable();// 如果是不处理的表,那么返回if (!Objects.equals(tableName, "student"))return;}if (data instanceof UpdateRowsEventData) {
//                System.out.println("Update:");
//                System.out.println(data);// 获取对应的操作对象的json化数据UpdateRowsEventData udata = (UpdateRowsEventData) data;List<Map.Entry<Serializable[], Serializable[]>> rows = udata.getRows();for (Map.Entry<Serializable[], Serializable[]> row : rows) {List<Serializable> entries = Arrays.asList(row.getValue());JSONObject dataObject = getDataObject(entries);System.out.println(dataObject);}} else if (data instanceof WriteRowsEventData) {WriteRowsEventData wData = new WriteRowsEventData();wData.getIncludedColumns();wData.getRows();System.out.println("Insert:");System.out.println(data);} else if (data instanceof DeleteRowsEventData) {System.out.println("Delete:");System.out.println(data);}});try {client.connect();} catch (IOException e) {e.printStackTrace();}}/*** 根据message获取对象*/private static JSONObject getDataObject(List<Serializable> message) {JSONObject resultObject = new JSONObject();String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"code\":\"3\"}";JSONObject json = JSON.parseObject(format);for (String key : json.keySet()) {resultObject.put(key, message.get(json.getInteger(key)));}return resultObject;}

首先,创建一个BinaryLogClient客户端对象,初始化时需要传入mysql的连接信息,创建完成后,给客户端注册一个监听器,来实现它对binlog的监听和解析。在监听器中,我们暂时只对4种类型的事件数据进行了处理,除了WriteRowsEventData、DeleteRowsEventData、UpdateRowsEventData对应增删改操作类型的事件数据外,还有一个TableMapEventData类型的数据,包含了表的对应关系,在后面的例子中再具体说明。
在这里,客户端监听到的是数据库级别的所有事件,并且可以监听到表的DML语句和DDL语句,所以我们只需要处理我们关心的事件数据就行,否则会收到大量的冗余数据。

启动程序,控制台输出:
在这里插入图片描述

我们需要知道表的信息或者的行信息,在反序列化的过程中,我们调用UpdateRowsEventData中只有行的index,而没有name,那么这个时候需要我们自己定义每列对应的续好了,然后解析获取到的row的信息。
在这里插入图片描述
方法如:

private static JSONObject getDataObject(List<Serializable> message) {JSONObject resultObject = new JSONObject();String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"code\":\"3\"}";JSONObject json = JSON.parseObject(format);for (String key : json.keySet()) {resultObject.put(key, message.get(json.getInteger(key)));}return resultObject;}

表和列信息模板

在getDataObject中,我们需要知道每个cloumnId对应的name,那么,我们需要自定义一个format,但是columnId和index的关系我们可以从information_schema中获取,当然,这就需要处理了,这里另说。
在这里插入图片描述

指定binlog的起始位置

By default, BinaryLogClient starts from the current (at the time of connect) master binlog position. If you wish to kick off from a specific filename or position, use client.setBinlogFilename(filename) + client.setBinlogPosition(position).

通常来说,监听从启动开始,但是也可以指定fileName+position

// 可选,设置start fileName+position,后续的话会都会开始执行的,包括后续的binlog file
client.setBinlogFilename("master-bin.000079");
client.setBinlogPosition(4);

查找binLog中的信息

参考:MySQL Binlog二进制日志基于position位置点恢复数据

如果无法确定启动的bin文件和postion,可以查看相关的binlog 文件,

查找binlog的文件名

查看master-server的配置文件,查看目录和文件前缀 mysql.ini
在这里插入图片描述
在目录下查找对应的binLog文件

在这里插入图片描述

查看binlong文件中的信息

show binlog events in 'master-bin.000080';

在这里插入图片描述

总结

1. git代码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/53476.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【测试联调】如何在前后端测试联调时优雅的构造异常场景

目录 背景 使用iptables实现 利用iptables丢弃某ip数据包 使用 -L 列出所有规则 IP 连通性 通信 测试 插入一条规则&#xff0c;丢弃此ip 的所有协议请求 列出所有规则 测试 丢弃规则内的IP 连通性 清除 规则列表的 限制 模拟ip进行丢包50%的处理。 mysql proxy 代理…

(亲测解决)PyCharm 从目录下导包提示 unresolved reference(完整图解)

最近在进行一个Flask项目的过程中遇到了unresolved reference 包名的问题&#xff0c;在网上找了好久解决方案&#xff0c;并没有一个能让我一步到位解决问题的。 后来&#xff0c;我对该问题和网上的解决方案进行了分析&#xff0c;发现网上大多数都是针对项目同一目录下的py…

【数据库】将excel数据导入mysql数据库

环境&#xff1a;Windows10 mysql8以上 将你要导入的excel表另存为txt格式 打开txt格式文件&#xff0c;删除表头行并另存为并更改编码方式&#xff08;由于与数据库的编码不同&#xff0c;会导致导入报错&#xff09; 通过命令行登录数据库 winr cmd进入 进入装mysql的目录位…

【JVM】(二)深入理解Java类加载机制与双亲委派模型

文章目录 前言一、类加载过程1.1 加载&#xff08;Loading&#xff09;1.2 验证&#xff08;Verification&#xff09;1.3 准备&#xff08;Preparation&#xff09;1.4 解析&#xff08;Resolution&#xff09;1.5 初始化&#xff08;Initialization&#xff09; 二、双亲委派…

LIME(可解释性分析方法)

目录 1.什么是LIME 2.思路 3.LIME在不同任务中的范式&#xff08;待补充&#xff09; 1.什么是LIME 简单理解&#xff1a; 对于分类任务&#xff1a;如下图所示&#xff0c;LIME可以列出分类结果&#xff0c;所依据特征对应给比重。 对于图像分类任务&#xff1a;如下图所示&a…

git 忽略掉不需要的文件

第一步&#xff1a;创建.gitignore文件 touch .gitignore 第二步&#xff1a;使用vi编辑器 输入不需要的文件&#xff0c;或用通配符*来忽视一系列文件 效果&#xff1a;

华为OD机试之报文回路(Java源码)

题目描述 IGMP 协议中响应报文和查询报文&#xff0c;是维系组播通路的两个重要报文&#xff0c;在一条已经建立的组播通路中两个相邻的 HOST 和 ROUTER&#xff0c;ROUTER 会给 HOST 发送查询报文&#xff0c;HOST 收到查询报文后给 ROUTER 回复一个响应报文&#xff0c;以维持…

每日一题——回文链表

回文链表 题目链接 回文结构即字符串正序逆序完全一致&#xff0c;如“1 2 3 4 3 2 1”&#xff0c;那么我们就要想办法同时比较链表头和链表尾的元素&#xff0c;看其是否相等。 下面介绍一种最常用的方法&#xff1a; 思路 如果我们仔细观察回文结构&#xff0c;就会得到一…

差值结构的相互作用能

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由6张二值化的图片组成&#xff0c;让A&#xff0c;B中各有3个点&#xff0c;且不重合&#xff0c;统计迭代次数并排序。 其中有10组数据 差值结构 A-B 迭代次数 构造平均列 平均列…

基于 Redux + TypeScript 实现强类型检查和对 Json 的数据清理

基于 Redux TypeScript 实现强类型检查和对 Json 的数据清理 突然像是打通了任督二脉一样就用了 generics 搞定了之前一直用 any 实现的类型…… 关于 Redux 的部分&#xff0c;这里不多赘述&#xff0c;基本的实现都在这里&#xff1a;Redux Toolkit 调用 API 的四种方式 和…

Spring 事务详解(注解方式)

目 录 序言 1、编程式事务 2、配置声明式事务 2.1 基于TransactionProxyFactoryBean的方式&#xff08;不常用&#xff0c;因为要为每一个类配置TransactionProxyFactoryBean&#xff09; 2.2 基于AspectJ的XML方式&#xff08;常用&#xff0c;可配置在某些类下的所有子…

【C++】string类

目录 &#x1f31e;专栏导读 &#x1f31b;为什么学习string类&#xff1f; ⭐C语言中的字符串 &#x1f31b;标准库中的string类 ⭐基本使用string ⭐string类的常用接口 ⭐总结&#xff1a; &#x1f31b;范围for的使用 &#x1f31e;专栏导读 &#x1f31f;作者简介…