CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

继上一篇 《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》 讨论了一种典型的 CDC 集成方案后,本文,我们改用 Flink CDC 完成同样的 CDC 数据入湖任务。与上一个方案有所不同的是:借助现有的 Flink 环境,我们可以直接使用 Flink CDC 从源头数据库接入数据,所以这是一个完整的端到端的解决方案,而上一篇文章我们省略了搭建 Kafka Connect + Debezium MySQL Connector 采集 CDC 数据的环节,因为这部分操作确实很复杂,很难在一篇文章中详细展开,这也说明了使用 Flink CDC 的一个优势,那就是:Flink CDC 在应用和架构上确实要比 Kafka Connect + Debezium MySQL Connector 的组合简单很多,如果你需要,甚至可以跳过 Kafka 直接将数据落到数据湖上。

1. 环境准备


  • 本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库;
    (https://docs.confluent.io/platform/7.4/installation/docker/config-reference.html#sr-long-configuration);

  • 我们需要安装多个 Flink Connector 和 Format 组件,包括:Flink CDC MySQL Connector、Flink Hudi Connector、Flink ‘debezium-avro-confluent’ Format Support,关于这些组件的安装,已经全部记录在了《Flink SQL Client 安装各类 Connector、Format 组件的方法汇总(持续更新中…)》一文中,请移步此文选择需要的组件酌情安装;

  • 安装好各种依赖组件后,执行如下脚本,清空 Hudi 表目标位置上的文件,停止正在运行中的 Yarn App,并启动一个新的 Flink Yarn Session:

    echo "clean kafak topic..."
    # run on a host installed kafka console client
    kafka-topics.sh --bootstrap-server 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092' --delete --topic 'orders_kafka_debezium_json'
    echo "clean hudi table target location..."
    aws s3 rm --recursive s3://glc-flink-hudi-test/sink_hudi_orders
    echo "Kill all running apps..."
    for appId in $(yarn application -list -appStates RUNNING 2>1 | awk 'NR > 2 { print $1 }'); doyarn application -kill $appId &> /dev/null
    done
    echo "start a flink yarn session..."
    flink-yarn-session -d
    
  • 清理 Kafka 中的 Topic (二次测试时会上次写入的消息会影响测试结果)

    export KAFKA_BOOTSTRAP_SERVERS='b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --delete --topic 'orders.*'
    kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS --list
    
  • 启动 Flink SQL Client

    /usr/lib/flink/bin/sql-client.sh embedded shell
    

2. 创建 Flink CDC 源表


本文依旧使用 Debezium 官方提供的一个 MySQL Docker镜像,构建操作可参考其 官方文档,使用的是其内置的 inventory 数据库。在一个既有的 Flink 环境上提前安装 Kafka 和 Flink CDC Connector 以及 Debezium Json Format,具体安装方法参考:Flink SQL Client 安装各类 Connector、组件的方法汇总(持续更新中…)。环境准备好后,打开 Flink SQL Client,执行如下建表语句:

SET 'sql-client.execution.result-mode' = 'TABLEAU';DROP TABLE IF EXISTS orders_mysql_cdc;CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3307','username' = 'root','password' = 'Admin1234!','database-name' = 'inventory','table-name' = 'orders'
);-- SELECT * FROM orders_mysql_cdc;

3. 创建 Kafka 中间表 ( debezium-json 格式)


Kafka 中间表使用 kafka connector + debezium-json 格式,也就是 《Flink CDC 与 Kafka 集成:State Snapshot 还是 Changelog?Kafka 还是 Upsert Kafka?》一文 第 《4. 测试组合:'connector'='kafka' + 'format'='debezium-json'》一节介绍的方式:

DROP TABLE IF EXISTS orders_kafka_debezium_json;CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (order_number int,order_date   date,purchaser    int,quantity     int,product_id   int
) WITH ('connector' = 'kafka','topic' = 'orders_kafka_debezium_json','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'orders_kafka_debezium_json','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
);insert into orders_kafka_debezium_json select * from orders_mysql_cdc;-- select * from orders_kafka_debezium_json;

4. 创建 Hudi 目标表


Kafka 中流式注入 debezium-json 格式的 CDC 消息后,就可以写入 Hudi 表了,Hudi 表使用 Hudi Connector 创建,必须使用流式读取(‘read.streaming.enabled’=‘true’),且必须开启 changelog 模式(‘changelog.enabled’ = ‘true’):

SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2s';
SET 'state.backend' = 'rocksdb';
SET 'state.backend.incremental' = 'true';
SET 'state.checkpoints.num-retained' = '10';DROP TABLE IF EXISTS sink_hudi_orders;CREATE TABLE IF NOT EXISTS sink_hudi_orders (order_number int PRIMARY KEY NOT ENFORCED,order_date   date,purchaser    int,quantity     int,product_id   int
) WITH ('connector' = 'hudi','path' = 's3://glc-flink-hudi-test/sink_hudi_orders','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','read.streaming.enabled'='true','read.streaming.check-interval' = '2','read.streaming.skip_compaction' = 'true','read.streaming.start-commit' = 'earliest'
);insert into sink_hudi_orders select * from orders_kafka_debezium_json;select * from sink_hudi_orders;

5. 完整演示


2024-02-06_12-06-25


关联阅读

《CDC 整合方案:MySQL > Flink CDC + Schema Registry + Avro > Kafka > Hudi》

《CDC 整合方案:MySQL > Flink CDC > Kafka > Hudi》

《CDC 整合方案:MySQL > Kafka Connect + Schema Registry + Avro > Kafka > Hudi》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/478942.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【数据结构】每天五分钟,快速入门数据结构(一)——数组

目录 一.初始化语法 二.特点 三.数组中的元素默认值 四.时间复杂度 五.Java中的ArrayList类 可变长度数组 1 使用 2 注意事项 3 实现原理 4 ArrayList源码 5 ArrayList方法 一.初始化语法 // 数组动态初始化(先定义数组,指定数组长度&#xf…

移动通信相关知识学习笔记

一、移动通信架构简图 移动无线的接入网是专指各种基站设备。核心网就是各种交换机。 二、无线信号基本原理 无线网络中,使用AP设备和天线来实现有线和无线信号互相转换。如上图所示,有线网络侧的数据从AP设备的有线接口进入AP后,经AP处理为…

电子邮件是什么意思?电子邮箱有什么作用?

电子邮件的定义是什么?怎么申请电子邮箱比较好? 伴随着科技的飞速发展,逐渐渗透到我们的工作、学习和生活中。那么,电子邮件到底是什么意思呢?它又是如何影响我们的日常生活的呢?接下来,就让蜂…

C# CAD交互界面-自定义面板集-comboBox选择图层

运行环境Visual Studio 2022 c# cad2016 一、代码说明 SelectLayer方法是一个自定义的AutoCAD命令方法,通过[CommandMethod("SelectLayer")]进行标记。方法首先获取当前活动文档,并检查是否有效。创建一个名为"SelectLayer"的Pale…

即时设计好用吗?怎么用?

进行 UI 设计,除了选择合适的工具,如果能提前掌握一些软件的使用技巧,提高每一步的操作效率,积累起来也能提高整体设计效率。所以这篇文章想给大家介绍一下国内高效的在线设计工具。 即时设计,让你的 UI 设计效率快到飞…

线程池:优化多线程管理的利器

引言 同步和异步想必各位都有了解,同步简单来说就是一件事做完再去做下一件;异步则是不用等一件事做完,就可以去做另一件事,当一件事完成后可以收到对应的通知;异步一般应用于一些耗时较长的操作,比如大型…

分布式锁的应用场景及实现

文章目录 分布式锁的应用场景及实现1. 应用场景2. 分布式锁原理3. 分布式锁的实现3.1 基于数据库 分布式锁的应用场景及实现 1. 应用场景 电商网站在进行秒杀、特价等大促活动时,面临访问量激增和高并发的挑战。由于活动商品通常是有限库存的,为了避免…

LabVIEW读取excel日期

LabVIEW读取excel日期 | Excel数据表格中有日期列和时间列,如下表所示: 通过LabVIEW直接读取Excel表格数据,读出的日期列和时间列数据与原始表格不一致,直接读出来的数据如下表所示: 日期、时间列数据异常 问题产生原因…

Sora新视角:从介绍到商业价值,全面解读优势

关于作者 还是大剑师兰特:曾是美国某知名大学计算机专业研究生,现为航空航海领域高级前端工程师;CSDN知名博主,GIS领域优质创作者,深耕openlayers、leaflet、mapbox、cesium,canvas,webgl&…

使用Sora部署实时音视频通信应用实战项目

一、项目概述 本项目将构建一个在线教学平台,实现教师与学生之间的实时音视频通信。平台将提供教师上传课件、发起授课邀请,学生加入课堂、实时互动等功能。通过使用Sora,我们将确保音视频通信的稳定、流畅和低延迟。 目录 一、项目概述 二…

目录IO 2月19日学习笔记

1. lseek off_t lseek(int fd, off_t offset, int whence); 功能: 重新设定文件描述符的偏移量 参数: fd:文件描述符 offset:偏移量 whence: SEEK_SET 文件开头 SEE…

linux基础命令和示例

redis在go语言中的使用 以下说明以读者有redis基础的前提下进行 未学习redis的可以到b站1小时浅学redis了解大概,学会如何使用 【GeekHour】一小时Redis教程_哔哩哔哩_bilibili 以下开发环境以windows为测试环境,旨在练习redis在go语言中的使用 red…