Flink实时数仓同步:实时表实战详解

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。这些同步存储方式包括离线仓库和实时仓库等,选择取决于业务需求和数据特性。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表数据,示例如下:

  1. [Mysql] 业务数据 - 用户表全量数据:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql] 2023-06-02 业务数据新增了一名用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

加粗为更新/新增数据

  1. [大数据平台] 2023-06-02日业务人员在大数据平台中查看用户表实时数据,期望数据和Mysql业务数据一致,如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

根据上述需求,我们可以得出需要构建实时表以满足业务数据的实时分析需求。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建实时表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、实现方式

FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {public static void main(String[] args) throws Exception {// 配置数据源和处理逻辑...// 实时任务启动env.execute("Print MySQL Snapshot + Binlog");}
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
# 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';create table mysql_user( 
# ...
) WITH ( 
# ...
);create table doris_user( 
# ...
) WITH ( 
# ...
);insert into doris_user select * from mysql_user;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: dorisname: Doris Sinkfenodes: 127.0.0.1:8030username: rootpassword: passpipeline:name: MySQL to Doris Pipelineparallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

四、sql脚本模式 + 实时表实现

4.1、实时表设计

背景需求需要实时查看业务表数据,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_user_real`
(`id` INT NOT NULL COMMENT '用户id',`name` STRING NULL COMMENT '用户昵称',`phone` STRING NULL COMMENT '手机号',`gender` CHAR(5) NULL COMMENT '用户性别',`create_time` DATETIMEV2(0) NULL COMMENT '用户注册时间',`update_time` DATETIMEV2(0) NULL COMMENT '用户更新时间'
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT '用户实时表'
DISTRIBUTED BY HASH(id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

4.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';create table mysql_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` CHAR(5),
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0),
PRIMARY KEY(id) NOT ENFORCED
) WITH ( 
'connector'='mysql-cdc',
'hostname'='10.185.163.177',
'port' = '80',
'username'='rouser',
'password'='123456',
'database-name' = 'database',
'table-name'='user'
);create table doris_user(
`id` INT,
`name` STRING,
`phone` STRING,
`gender` STRING,
`create_time` TIMESTAMP(0),
`update_time` TIMESTAMP(0)
) WITH ( 
'password'='password',
'connector'='doris',
'fenodes'='11.113.208.103:8030',
'table.identifier'='database.user',
'sink.label-prefix'='唯一任务标识,每次启动都要唯一',
'username'='username' 
);insert into doris_user select * from mysql_user;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2dorisFlink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom3332023-06-01 13:00:002023-06-01 13:00:00
  1. [Mysql]业务数据2023-06-02日新增了一名tony用户,且更改了tom的手机号,此时表数据如下:
idnamephonegendercreate_timeupdate_time备注
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00(手机号从333->444)
4tony5552023-06-02 10:00:002023-06-02 10:00:00(新增tony用户)
  1. 此时Doris 数据如下:
idnamephonegendercreate_timeupdate_time
1jack1112023-06-01 13:00:002023-06-01 13:00:00
2jason2222023-06-01 13:00:002023-06-01 13:00:00
3tom4442023-06-01 13:00:002023-06-02 09:00:00
4tony5552023-06-02 10:00:002023-06-02 10:00:00

五、总结

本文介绍了实时数仓同步的实操案例,通过 FlinkCDC 和 Doris 实现了实时表的构建和数据同步。在实现过程中,尤其压迫注意数据类型的转换问题,以确保不同数据存储之间的兼容性。

此外要根据具体需求和场景,选择合适的实现方式,本文选择了 sql-client --f file 模式来实现实时表需求,旨在为读者提供了不同的实践体验。

六、相关资料

  • Doris 数据模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/527329.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

VUE3 使用axios网络请求

1.新建工程 参考,VUE3 环境搭建:https://blog.csdn.net/LQ_001/article/details/136293795,运行命令 vue create vue-demo 2.引入axios 不管何种引用,都要在工程中安装 axios 包。安装命令:npm install --save axio…

python的scripts文件夹作用

Windows系统: Scripts文件夹通常位于Python的安装目录下,如C:\Python\Scripts。该文件夹内包含了各种有用的工具,例如pip、virtualenv等,这些工具有助于管理和配置Python环境和依赖包。 Linux系统: 在Linux系统中&…

flink重温笔记(十四): flink 高级特性和新特性(3)——数据类型及 Avro 序列化

Flink学习笔记 前言:今天是学习 flink 的第 14 天啦!学习了 flink 高级特性和新特性之数据类型及 avro 序列化,主要是解决大数据领域数据规范化写入和规范化读取的问题,avro 数据结构可以节约存储空间,本文中结合企业真…

[BJDCTF2020]----EzPHP

文章目录 pass-1pass-2pass-3pass-4pass-5pass-6pass-7 查看题目&#xff0c;右键源代码&#xff0c;发现GFXEIM3YFZYGQ4A&#xff0c;base64解码&#xff1a;1nD3x.php 访问1nD3x.php&#xff0c;代码审计&#xff0c;一步一步分析 <?php highlight_file(__FILE__); error…

C++的类与对象(三):构造函数、析构函数、对象的销毁顺序

目录 类的6个默认成员函数 构造函数 语法 特性 析构函数 特性 对象的销毁顺序​​​​​​​​​​​​​​ 类的6个默认成员函数 问题&#xff1a;一个什么成员都没的类叫做空类&#xff0c;空类中真的什么都没有吗&#xff1f; 基本概念&#xff1a;任何类在什么都不…

基于范围的for循环(C++11)和auto

auto C11中&#xff0c;标准委员会赋予了auto全新的含义即&#xff1a; auto不再是一个存储类型指示符&#xff0c;而是作为一个新的类型 指示符来指示编译器&#xff0c;auto声明的变量必须由编译器在编译时期推导而得。 int a 10;auto b a;auto c a;auto d TestAuto(…

第三周组会——动态多目标优化算法

首先对上周写的DF测试函数进行了优化和增加 DF4 pf: DF5测试函数PF DF6 遇到的问题,在算法问题的参数taut(变化频率)默认是10数字变小时就算是9,算法会跟不上收敛 新读的文献 A Novel Dynamic Multiobjective Optimization Algorithm With Hierarchical Response System 一…

基于Vue的娱讯移动端APP前端设计与实现

目 录 摘 要 Abstract 引 言 1绪论 1.1课题背景及目的 1.1.1移动端APP发展简介 3 1.1.2移动端APP的优势 3 1.2前端开发相关技术 1.2.1前端开发工具介绍 3 1.2.2 前端开发相关技术介绍 4 1.3本章小结 2系统分析 2.1功能需求分析 2.2系统工作流程 2.3本章小结 3系统设…

JDK 17:Java生态系统的最新巨擘

JDK 17&#xff1a;Java生态系统的最新巨擘 &#x1f680; JDK 17&#xff1a;Java生态系统的最新巨擘 &#x1f680;摘要 &#x1f31f;引言 &#x1f308;模块一&#xff1a;性能优化与提升 &#x1f527;垃圾回收器的改进&#xff1a;JIT编译器的优化&#xff1a;其他性能优…

基于河马优化算法(Hippopotamus optimization algorithm,HO)的无人机三维路径规划

一、无人机路径规划模型介绍 二、算法介绍 河马优化算法&#xff08;Hippopotamus optimization algorithm&#xff0c;HO&#xff09;由Amiri等人于2024年提出&#xff0c;该算法模拟了河马在河流或池塘中的位置更新、针对捕食者的防御策略以及规避方法。2024最新算法&#x…

java中移位<< >> <<< |数据类型转换

移位 x64转换二进制&#xff1a;100 0000 左移2位 &#xff1a; 1000 0000 0 对应十进制 i 256 >>右移 <<左移 >>无符号位右移 关于右移一位相当于整除2 数据类型及其转换 基本数据类型&#xff0c;数据类型范围 byte(-128~127)&#xff08;-2^7~2…

Java Socket:飞鸽传书的网络套接字

套接字&#xff08;Socket&#xff09;是一个抽象层&#xff0c;应用程序可以通过它发送或接收数据&#xff1b;就像操作文件那样可以打开、读写和关闭。套接字允许应用程序将 I/O 应用于网络中&#xff0c;并与其他应用程序进行通信。网络套接字是 IP 地址与端口的组合。 01、…