基于 Flink CDC 构建 MySQL 的 Streaming ETL to MySQL

简介

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。CDC 技术的应用场景非常广泛:
• 数据同步:用于备份,容灾;
• 数据分发:一个数据源分发给多个下游系统;
• 数据采集:面向数据仓库 / 数据湖的 ETL 数据集成,是非常重要的数据源。
CDC 的技术方案非常多,目前业界主流的实现机制可以分为两种:
• 基于查询的 CDC:
• 离线调度查询作业,批处理。把一张表同步到其他系统,每次通过查询去获取表中最新的数据;
• 无法保障数据一致性,查的过程中有可能数据已经发生了多次变更;
• 不保障实时性,基于离线调度存在天然的延迟。
• 基于日志的 CDC:
• 实时消费日志,流处理,例如 MySQL 的 binlog 日志完整记录了数据库中的变更,可以把 binlog 文件当作流的数据源;
• 保障数据一致性,因为 binlog 文件包含了所有历史变更明细;
• 保障实时性,因为类似 binlog 的日志文件是可以流式消费的,提供的是实时数据。
对比常见的开源 CDC 方案,我们可以发现:
• 对比增量同步能力,
• 基于日志的方式,可以很好的做到增量同步;
• 而基于查询的方式是很难做到增量同步的。
• 对比全量同步能力,基于查询或者日志的 CDC 方案基本都支持,除了 Canal。
• 而对比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持较好。
• 从架构角度去看,该表将架构分为单机和分布式,这里的分布式架构不单纯体现在数据读取能力的水平扩展上,更重要的是在大数据场景下分布式系统接入能力。例如 Flink CDC 的数据入湖或者入仓的时候,下游通常是分布式的系统,如 Hive、HDFS、Iceberg、Hudi 等,那么从对接入分布式系统能力上看,Flink CDC 的架构能够很好地接入此类系统。
• 在数据转换 / 数据清洗能力上,当数据进入到 CDC 工具的时候是否能较方便的对数据做一些过滤或者清洗,甚至聚合?
• 在 Flink CDC 上操作相当简单,可以通过 Flink SQL 去操作这些数据;
• 但是像 DataX、Debezium 等则需要通过脚本或者模板去做,所以用户的使用门槛会比较高。
• 另外,在生态方面,这里指的是下游的一些数据库或者数据源的支持。Flink CDC 下游有丰富的 Connector,例如写入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常见的一些系统,也支持各种自定义 connector。
在这里插入图片描述
在这里插入图片描述

1.安装单机版

下载

yum install -y java-1.8.0-openjdk.x86_64
yum install -y  java-1.8.0-openjdk-devel
wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.17.2/flink-1.17.2-bin-scala_2.12.tgz
mkdir -p /opt/flink
tar -zxvf flink-1.17.2-bin-scala_2.12.tgz -C /opt/flink 

下载jar复制到/opt/flink/flink-1.17.2/lib

<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.4.2</version><scope>provided</scope>
</dependency>

配置

vim /opt/flink/flink-1.17.2/conf/flink-conf.yaml

rest.port: 8081
rest.bind-address: 0.0.0.0
jobmanager.execution.timezone: Asia/Shanghai

启动

/opt/flink/flink-1.17.2/bin/stop-cluster.sh
/opt/flink/flink-1.17.2/bin/start-cluster.sh 访问http://10.6.8.227:8081/

2.创建 两个mysql 数据库

docker run -p 13306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysqldocker run -p 23306:3306 \
-e MYSQL_ROOT_PASSWORD=mysql \
-d mysql

初始化mysql 表结构

CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512)
);

在源库中插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

3.CDC 步骤

启动 /opt/flink/flink-1.17.2/bin/sql-client.sh
只能一条语句一条语句的执行

CREATE TABLE products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '13306','username' = 'root','password' = 'mysql','database-name' = 'mydb','table-name' = 'products');CREATE TABLE sink_products (id INT,name STRING,description STRING,PRIMARY KEY (id) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:23306/mydb?serverTimezone=Asia/Shanghai','username' = 'root','password' = 'mysql','table-name' = 'sink_products');insert into sink_products select * from products;

4.验证

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

参考文档

http://124.220.104.235/web/chatgpt

https://ververica.github.io/flink-cdc-connectors/master/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-postgres-tutorial-zh.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/256258.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【我爱C语言】详解字符函数isdigit和字符串转换函数(atoi和snprintf实现互相转换字符串)三种strlen模拟实现

&#x1f308;write in front :&#x1f50d;个人主页 &#xff1a; 啊森要自信的主页 ✏️真正相信奇迹的家伙&#xff0c;本身和奇迹一样了不起啊&#xff01; 欢迎大家关注&#x1f50d;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;>希望看完我的文章对你有小小的帮助&am…

mfc140u.dll文件下载的方法指南,教你多种方法修复mfc140u.dll

在面对诸如"mfc140u.dll文件丢失"或者"mfc140u.dll错误"等问题时&#xff0c;许多用户可能会考虑直接从互联网上下载该DLL文件来快速解决问题。确实&#xff0c;此类错误信息经常在尝试运行某些软件&#xff0c;特别是依赖于 Microsoft Visual C Redistrib…

论文阅读——Deformable ConvNets v2

论文&#xff1a;https://arxiv.org/pdf/1811.11168.pdf 代码&#xff1a;https://github.com/chengdazhi/Deformable-Convolution-V2-PyTorch 1. 介绍 可变形卷积能够很好地学习到发生形变的物体&#xff0c;但是论文观察到当尽管比普通卷积网络能够更适应物体形变&#xff…

『亚马逊云科技产品测评』活动征文|AWS云服务器EC2实例实现ByConity快速部署

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道 前言 亚马逊是全球最大的在线零售商和云计算服务提供商。AWS云服务器在…

组合总和II(回溯、去重)

40. 组合总和 II - 力扣&#xff08;LeetCode&#xff09; 题目描述 给定一个候选人编号的集合 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用 一次 。 注意&#xff1a…

手把手将Visual Studio Code变成Python开发神器

Visual Studio Code 是一款功能强大、可扩展且轻量级的代码编辑器&#xff0c;经过多年的发展&#xff0c;已经成为 Python 社区的首选代码编辑器之一 下面我们将学习如何安装 Visual Studio Code 并将其设置为 Python 开发工具&#xff0c;以及如何使用 VS Code 提高编程工作…

[Harmonyos]鸿蒙操作系统架构

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

springboot084基于springboot的论坛网站

springboot084基于springboot的论坛网站 成品项目已经更新&#xff01;同学们可以打开链接查看&#xff01;需要定做的及时联系我&#xff01;专业团队定做&#xff01;全程包售后&#xff01; 2000套项目视频链接&#xff1a;https://pan.baidu.com/s/1N4L3zMQ9nNm8nvEVfIR…

扫描器的使用

漏扫器 注意事项 扫描器会给客户的业务造成影响。比如&#xff0c;如果存在sql注入漏洞&#xff08;重大的漏洞&#xff09;的话&#xff0c;会给客户的数据库插入脏数据&#xff0c;后果很严重 主机漏扫 针对IP地址和网段的漏洞扫描&#xff0c;例如&#xff1a;22端口弱口…

【项目问题解决】IDEA2020.3 使用 lombok 插件 java: 找不到符号 符号: 方法 builder()

目录 lombok找不到符号问题修改 1.问题描述2.问题原因3.解决思路4.解决方案5.总结6.参考 文章所属专区 项目问题解决 1.问题描述 IDEA2020.3 使用 lombok 插件 java: 找不到符号 符号: 方法 builder()&#xff0c;无法使用lombok下应有的注解&#xff0c;一度怀疑是版本问题 …

【web开发网页制作】Html+Css网页制作关于明星介绍王嘉尔(5页面)【附源码下载】

htmlcss网页制作目录 写在前面涉及知识效果展示1、网页构思2、网页实现2.1 首页2.2 关于我2.3 成长经历2.4 朋友2.5 爱好 3、源码分享 写在前面 接着分享哈&#xff0c;还是学生时代的库存&#xff0c;当时是为了不同风格的素材&#xff0c;所以自己选择了多个方向的主题来练习…

使用python操作excel文档

导入xlsxwriter包 python轻量化的语言&#xff0c;用来操作文档简直易如反掌&#xff0c;首先你需要导入的是import xlsxwriter包&#xff0c;他包括了操作文档所需要的全部工具方法&#xff0c;你只需要调用就好了。 操作excel指南 首先你需要创建一个文件xlsxwriter.Workb…