全方位解读SeaTunnel MySQL CDC连接器:实现数据高效同步的强大工具

在当今数据快速增长的时代,实时、高效地同步和处理来自各种数据源的信息成为了企业和开发者面临的重要挑战。

file

MySQL作为广泛使用的数据库之一,其变更数据捕获(CDC)功能对于实现这一目标至关重要。在这篇文章中,我们将深入探讨MySQL CDC源连接器在SeaTunnel框架下的应用,涵盖从基础设置到高级配置的各个方面。

MySQL CDC源连接器

支持的引擎

SeaTunnel Zeta
Flink

主要特性

  • 批量
  • 流式
  • 精确一次
  • 列投影
  • 并行处理
  • 支持用户定义的拆分

描述

MySQL CDC连接器允许从MySQL数据库读取快照数据和增量数据。本文档描述了如何设置MySQL CDC连接器以对MySQL数据库运行SQL查询。

支持的数据源信息

数据源支持的版本驱动UrlMaven
MySQL
  • MySQL: 5.6, 5.7, 8.0.x
  • RDS MySQL: 5.6, 5.7, 8.0.x
com.mysql.cj.jdbc.Driverjdbc:mysql://localhost:3306/testhttps://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28

数据库依赖

安装Jdbc驱动程序

请将mysql驱动程序下载并放入${SEATUNNEL_HOME}/lib/目录中。例如:cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/lib/

创建MySQL用户

您必须为Debezium MySQL连接器监视的所有数据库定义一个具有适当权限的MySQL用户。

  1. 创建MySQL用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  1. 为用户授予所需权限:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
  1. 完成用户的权限设置:
mysql> FLUSH PRIVILEGES;

启用MySQL binlog

为了实现MySQL的复制,必须启用二进制日志。二进制日志记录了用于复制工具传播更改的事务更新。

  1. 检查log-bin选项是否已经开启:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)
  1. 如果与上述结果不一致,请使用以下属性配置您的MySQL服务器配置文件($MYSQL_HOME/mysql.cnf),如下表所示:
# 启用二进制复制日志并设置前缀、过期时间和日志格式。
# 前缀是任意的,过期时间对于集成测试可以短一些,但在生产系统中会更长。
# 行级信息对于摄取工作是必需的。
# 服务器ID在生产系统上是必需的,但会有所不同。
server-id         = 223344
log_bin           = mysql-bin
expire_logs_days  = 10
binlog_format     = row
binlog_row_image  = FULL# 启用gtid模式
gtid_mode = on
enforce_gtid_consistency = on
  1. 重启MySQL服务器
/etc/inint.d/mysqld restart
  1. 再次确认您的更改,通过再次检查binlog状态:
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
+--------------------------+----------------+
| Variable_name            | Value          |
+--------------------------+----------------+
| binlog_format            | ROW            |
| binlog_row_image         | FULL           |
| enforce_gtid_consistency | ON             |
| gtid_mode                | ON             |
| log_bin                  | ON             |
+--------------------------+----------------+
5 rows in set (0.00 sec)

注意

设置MySQL会话超时

当为大型数据库创建初始一致快照时,在读取表期间,已建立的连接可能会超时。您可以通过在MySQL配置文件中配置interactive_timeoutwait_timeout来防止这种行为。

  • interactive_timeout:服务器等待交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。
  • wait_timeout:服务器等待非交互连接活动关闭之前的秒数。有关更多详细信息,请参阅MySQL文档。

有关更多数据库设置,请参见Debezium MySQL连接器

数据类型映射

Mysql Data typeSeaTunnel Data type
BIT(1)
TINYINT(1)
BOOLEAN
TINYINTTINYINT
TINYINT UNSIGNED
SMALLINT
SMALLINT
SMALLINT UNSIGNED
MEDIUMINT
MEDIUMINT UNSIGNED
INT
INTEGER
YEAR
INT
INT UNSIGNED
INTEGER UNSIGNED
BIGINT
BIGINT
BIGINT UNSIGNEDDECIMAL(20,0)
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
DECIMAL(p,s)
FLOAT
FLOAT UNSIGNED
FLOAT
DOUBLE
DOUBLE UNSIGNED
REAL
REAL UNSIGNED
DOUBLE
CHAR
VARCHAR
TINYTEXT
MEDIUMTEXT
TEXT
LONGTEXT
ENUM
JSON
STRING
DATEDATE
TIMETIME
DATETIME
TIMESTAMP
TIMESTAMP
BINARY
VARBINAR
BIT(p)
TINYBLOB
MEDIUMBLOB
BLOB
LONGBLOB
BYTES

源选项

名称类型必需默认值描述
base-urlString-JDBC连接的URL。参考示例:jdbc:mysql://localhost:3306:3306/test
usernameString-连接到数据库服务器时使用的数据库名称。
passwordString-连接到数据库服务器时使用的密码。
database-namesList-要监视的数据库的名称。
table-namesList-要监视的数据库的表名。表名需要包含数据库名称,例如:database_name.table_name
startup.modeEnumNoINITIALOptional startup mode for MySQL CDC consumer, valid enumerations are initial, earliest, latest and specific.
initial: Synchronize historical data at startup, and then synchronize incremental data.
earliest: Startup from the earliest offset possible.
latest: Startup from the latest offset.
specific: Startup from user-supplied specific offsets.
startup.specific-offset.fileStringNo-Start from the specified binlog file name. Note, This option is required when the startup.mode option used specific.
startup.specific-offset.posLongNo-Start from the specified binlog file position. Note, This option is required when the startup.mode option used specific.
stop.modeEnumNoNEVEROptional stop mode for MySQL CDC consumer, valid enumerations are never, latest or specific.
never: Real-time job don't stop the source.
latest: Stop from the latest offset.
specific: Stop from user-supplied specific offset.
stop.specific-offset.fileStringNo-Stop from the specified binlog file name. Note, This option is required when the stop.mode option used specific.
stop.specific-offset.posLongNo-Stop from the specified binlog file position. Note, This option is required when the stop.mode option used specific.
snapshot.split.sizeIntegerNo8096The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.
snapshot.fetch.sizeIntegerNo1024The maximum fetch size for per poll when read table snapshot.
server-idStringNo-A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like 5400, the numeric ID range syntax is like '5400-5408'.
Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the
MySQL cluster as another server (with this unique ID) so it can read the binlog.
By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
server-time-zoneStringNoUTCThe session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
connect.timeout.msDurationNo30000The maximum time that the connector should wait after trying to connect to the database server before timing out.
connect.max-retriesIntegerNo3The max retry times that the connector should retry to build database server connection.
connection.pool.sizeIntegerNo20The jdbc connection pool size.
chunk-key.even-distribution.factor.upper-boundDoubleNo100The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.
chunk-key.even-distribution.factor.lower-boundDoubleNo0.05The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.
sample-sharding.thresholdIntegerNo1000This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
inverse-sampling.rateIntegerNo1000The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
exactly_onceBooleanNotrueEnable exactly once semantic.
formatEnumNoDEFAULTOptional output format for MySQL CDC, valid enumerations are DEFAULTCOMPATIBLE_DEBEZIUM_JSON.
debeziumConfigNo-Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
common-optionsno-Source plugin common parameters, please refer to Source Common Options for details

Task Example

Simple

Support multi-table reading

env {parallelism = 1job.mode = "STREAMING"checkpoint.interval = 10000
}source {MySQL-CDC {catalog = {factory = MySQL}base-url = "jdbc:mysql://localhost:3306/testdb"username = "root"password = "root@123"table-names = ["testdb.table1", "testdb.table2"]startup.mode = "initial"}
}sink {Console {}
}

Support debezium-compatible format send to kafka

Must be used with kafka connector sink, see compatible debezium format for details

Changelog

  • Add MySQL CDC Source Connector

next version

通过对MySQL CDC源连接器的深入了解,我们不仅能够更好地掌握数据同步的核心机制,还能有效提升数据处理的效率和精度。

无论是在数据集成、实时分析还是其他复杂的数据处理场景中,MySQL CDC源连接器都将成为SeaTunnel用户强大的助手。随着数据技术的不断进步,期待看到更多创新和优化在未来版本中的实现,为开发者带来更多便利和可能。

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/255267.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Linux] nginx配置的主配置文件

一、六个模块的作用 全局块:全局配置,对全局生效; events块:配置影响 Nginx 服务器与用户的网络连接; http块:配置代理,缓存,日志定义等绝大多数功能和第三方模块的配置;…

如何评估和提高网页速度?

页面内容的加载速度称为页面的网站速度或加载速度。拥有快速的网站加载时间对于搜索引擎优化至关重要。页面加载时间受多个变量影响,包括虚拟主机和页面大小。桌面版和移动版网站之间也存在性能差距。了解页面速度如何影响搜索引擎优化,从何处获取衡量工…

UVM:uvm_component与uvm_object

(1)uvm_component与uvm_object的关系 uvm_object是UVM中最基本的类,几乎所有的类都继承自uvm_object,包括uvm_component。uvm_component派 生自uvm_object这个事实会让很多人惊讶,而这个事实说明了uvm_component拥有uv…

CentOS上的HTTPS:如何设置安全的HTTPS连接

在CentOS系统上设置安全的HTTPS连接是保护你的网站和用户数据的关键步骤。通过使用HTTPS,你可以加密与用户的通信,并确保数据在传输过程中的安全性。以下是设置安全的HTTPS连接的步骤概述: 获取SSL证书: 首先,你需要…

星闪的三层架构

在数字化转型的浪潮中,物联网技术正成为连接世界的纽带,将各种智能设备融为一个无缝的整体。而在这个大背景下,星闪崭露头角,将成为连接未来的关键枢纽。本文将介绍星闪系统的三层架构,包括基础应用层、基础服务层和星…

热烈祝贺许战海老师成为北京湖南商会特聘专家!

在北京的初冬时节,一股商业的暖流在世纪华天大酒店的湖南厅中涌动。2023年12月3日下午,这里迎来了一场盛大的聚会——北京湖南企业商会成立20周年的预热活动之一:“湘商大讲堂”。这不仅是一次庆祝,更是一次对未来的展望&#xff…

C语言二叉树的基本概念(一)

目录 二叉树 二叉树的分类(目前只谈两种) 满二叉树 完全二叉树 二叉树的性质(其余的可以自己总结) 选择练习 二叉树的存储结构 顺序存储方式 链式存储方式 二叉树 定义:二叉树是一种特殊的树状数据结构&…

系统调用过程

应用程序通过系统调用请求操作系统的服务。而系统中的各种共享资源都由操作系统内核统一掌管,因此凡是与共享资源有关的操作(如存储分配、/O操作、文件管理等),都必须通过系统调用的方式向操作系统内核提出服务请求,由…

qt 5.15.2 主窗体菜单工具栏树控件功能

qt 5.15.2 主窗体菜单工具栏树控件功能 显示主窗体效果&#xff1a; mainwindow.h文件内容&#xff1a; #ifndef MAINWINDOW_H #define MAINWINDOW_H#include <QMainWindow> #include <QFileDialog> #include <QString> #include <QMessageBox>#inc…

SFX的妙用——如何在不安装软件的情况下打开自定义格式文件?

前段时间看到群友讨论压缩包能不能运行&#xff0c;想起了n年前用自解压文件SFX实现的一个“需求”&#xff1a;在没有安装任何应用软件的Windows&#xff08;当时还要支持XP&#xff09;上能双击打开自定义格式的文件。当时第一反应是这“需求”太奇葩了&#xff0c;简直是不可…

【JavaScript】JS——Map数据类型

【JavaScript】JS——Map数据类型 什么是Map?特性Map与Object的比较 map的创建map的属性map相关方法map的遍历 什么是Map? 存储键值对的对象。 能够记住键的原始插入顺序任何值&#xff08;对象或原始值&#xff09;都可以作为键或值。 特性 Map中的一个键只能出现一次&am…

PAT乙级—1002 写出这个数(C语言)

读入一个正整数 n&#xff0c;计算其各位数字之和&#xff0c;用汉语拼音写出和的每一位数字。 输入样例&#xff1a; 1234567890987654321123456789 输出样例&#xff1a; yi san wu #include <stdio.h>int main() {char number[100] { \0 };int i 0, sum 0, n[8] {…