【Flink CDC(一)】实现mysql整表与增量读取

文章目录

  • 一. 运行前准备
    • 1. 依赖
      • 1.1. Maven dependency
      • 1.2. SQL Client JAR(推荐)
    • 2. 配置 MySQL 服务器(必须)
  • 二. 功能说明
    • 1. 启动模式
    • 2. 全量阶段支持 checkpoint
    • 3. 关于无主键表
    • Exactly-Once 处理
  • 三. 实战
    • 1. 实现mysql整表与增量表同步
  • FAQ

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

 

一. 运行前准备

1. 依赖

1.1. Maven dependency

<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 --><version>2.4.0</version>
</dependency>

 

1.2. SQL Client JAR(推荐)

下载 flink-sql-connector-mysql-cdc-2.4.0.jar 到 <FLINK_HOME>/lib/ 目录下。

 

2. 配置 MySQL 服务器(必须)

你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

# 创建用户
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';# 赋权
mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';# 刷新权限
mysql> FLUSH PRIVILEGES;

注意:

scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

 

二. 功能说明

1. 启动模式

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
    的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

MySQLSource.builder().startupOptions(StartupOptions.earliest()) // 从最早位点启动.startupOptions(StartupOptions.latest()) // 从最晚位点启动.startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动.startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动.startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动....build()CREATE TABLE mysql_source (...) WITH ('connector' = 'mysql-cdc','scan.startup.mode' = 'earliest-offset', -- 从最早位点启动'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动'scan.startup.mode' = 'specific-offset', -- 从特定位点启动'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合'scan.startup.mode' = 'timestamp', -- 从特定位点启动'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳...
)

 

2. 全量阶段支持 checkpoint

增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

 

3. 关于无主键表

从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

在使用无主键表时,需要注意以下两种情况。

  1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

  2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

  • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
  • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

 

Exactly-Once 处理

MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

 

三. 实战

1. 实现mysql整表与增量表同步

-- 'scan.startup.mode'= 'initial' 
-- 
CREATE TABLE tjy_sql1  
(  `id` int,  `name` string,  `face` string  ,PRIMARY KEY(id) NOT ENFORCED  
) WITH (  'connector' = 'mysql-cdc',  'hostname' = 'xxx',  'port' = '3306',  'username' = 'middle_test',  'password' = '123456',  'database-name' = 'middle_test',  'table-name' = 'tjy_fortest1'  -- ,'scan.incremental.snapshot.enabled' = 'false'  --  initial: 默认值,全表同步,然后进行增量同步;--  'scan.startup.mode'= 'initial'  -- 'debezium.snapshot.mode' = 'initial'      );  CREATE TABLE tjy_sql1_sink  (  `id` int,  `name` string,  `face` string  ,PRIMARY KEY(id) NOT ENFORCED  ) WITH (  'connector' = 'mysql-x',  'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  'username' = 'middle_test',  'password' = '123456',  'table-name' = 'flink_type',  'table-name' = 'tjy_fortest2'  );  insert into tjy_sql1_sink select * from tjy_sql1;

 

FAQ

相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

可能涉及到的问题

在这里插入图片描述

 

参考:
官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/492821.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬虫-付费代理推荐和使用

付费代理的使用 相对免费代理来说&#xff0c;付费代理的稳定性更高。本节将介绍爬虫付费代理的相关使用过程。 1. 付费代理分类 付费代理分为两类&#xff1a; 一类提供接口获取海量代理&#xff0c;按天或者按量收费&#xff0c;如讯代理。 一类搭建了代理隧道&#xff0…

nginx高级配置详解

目录 一、网页的状态页 1、状态页的基本配置 2、搭配验证模块使用 3、结合白名单使用 二、nginx 第三方模块 1、echo模块 1.1 编译安装echo模块 1.2 配置echo模块 三、nginx变量 1、内置变量 2、自定义变量 四、自定义图标 五、自定义访问日志 1、自定义日志格式…

基于Java SSM框架实现高考填报信息系统项目【项目源码】

基于java的SSM框架实现高考填报信息系统演示 JAVA简介 Java主要采用CORBA技术和安全模型&#xff0c;可以在互联网应用的数据保护。它还提供了对EJB&#xff08;Enterprise JavaBeans&#xff09;的全面支持&#xff0c;java servlet API&#xff0c;JSP&#xff08;java serv…

hot100刷题记录-哈希

一、两数之和 题目&#xff1a;https://leetcode.cn/problems/two-sum/description/?envTypestudy-plan-v2&envIdtop-100-liked 方法1&#xff1a;枚举 class Solution:def twoSum(self, nums: List[int], target: int) -> List[int]:for id, num in enumerate(nums)…

C++ //练习 8.4 编写函数,以读模式打开一个文件,将其内容读入到一个string的vector中,将每一行作为一个独立的元素存于vector中。

C Primer&#xff08;第5版&#xff09; 练习 8.4 练习 8.4 编写函数&#xff0c;以读模式打开一个文件&#xff0c;将其内容读入到一个string的vector中&#xff0c;将每一行作为一个独立的元素存于vector中。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09…

多条件查询展开收起

顶部筛选条件很多时&#xff0c;就需要对赛选条件进行分组&#xff0c;每组都要有展开收起&#xff0c;根据页面自定义&#xff0c;还需要显示收起查询中有几条查询中的内容 思路&#xff1a; 封装一个组件&#xff0c;利用插槽自定义查询条件和分组 // 子组件 <script se…

智能枪弹柜管理系统-智能枪弹管理系统DW-S306

随着社会的发展和治安形势的日益严峻&#xff0c;对于枪弹的管理变得尤为重要。传统的手工记录和存放方式已经无法满足现代化、高效化、安全化的需求。因此&#xff0c;智能枪弹柜管理系统应运而 生。 在建设万兆主干、千兆终端的监控专网的基础上&#xff0c;弹药库安全技术…

MySQL数据库进阶第四篇(视图/存储过程/触发器)

文章目录 一、视图简单介绍与基础语法二、视图的检查选项三、视图的更新四、视图的作用五、存储过程的概念与特点六、存储过程的 创建&#xff0c;调用&#xff0c;查看&#xff0c;删除七、存储过程 — 系统变量八、存储过程 — 用户定义变量九、存储过程 — 局部变量十、存储…

最便宜也能最安全:Positive SSL证书

PositiveSSL证书的优势 价格实惠&#xff1a;相较于其他品牌的SSL证书&#xff0c;PositiveSSL证书的价格更为亲民。这使得小型企业和个人网站也能够轻松采用SSL加密技术&#xff0c;提高网站的安全性。安全性能可靠&#xff1a;虽然价格便宜&#xff0c;但PositiveSSL证书在安…

CMake和VsCode调试的使用

目录 CMake使用 CMake下载 创建系统文件目录 MakeList编写规范 VsCode启动调试 添加配置文件 添加断点&#xff0c;启动调试 CMake使用 CMake下载 输入指令 sudo apt install cmake 安装cmake&#xff0c;使用 cmake -version可查看cmake的版本信息 创建系统文件目…

unity导航网格无法烘培到台阶和斜坡

如图是我在b站学Unity导航网格时建的一个示例场景&#xff0c;本场景使用的为棱长1m的立方体&#xff0c;读者可以以此为参照度量其他物体大小。 可见导航网格根本无法烘焙到斜坡和台阶上&#xff0c;为解决问题我做了不少尝试&#xff0c;调整最大坡度和步高都没办法解决问题…

图像复原天花板!IR开创性新作实现最佳视觉质量,修复更智能、更逼真

图像复原&#xff08;IR&#xff09;指在已知图像退化的原因和模型的情况下&#xff0c;通过一系列的逆过程来恢复出原始图像的过程。这是一个长期的低级视觉任务&#xff0c;也是图像处理领域的一个重要课题。 随着深度学习技术的发展&#xff0c;图像复原领域不断出现新的网…