Flink实时数仓同步:切片表实战详解

一、背景

在大数据领域,初始阶段业务数据通常被存储于关系型数据库,如MySQL。然而,为满足日常分析和报表等需求,大数据平台采用多种同步方式,以适应这些业务数据的不同存储需求。

一项常见需求是,业务使用人员需要大数据分析平台中实时查看业务表中某一维度的相应数据数据,示例如下:

  1. [Mysql] 业务数据 - 假设我们有一个订单表(也称为事实表),记录了公司的销售订单信息。该表包含以下字段:订单ID、客户ID、产品ID、销售日期、销售数量和销售额等。:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
  1. [大数据平台] - 业务人员希望按照客户ID维度聚合销售数量和销售额,以便实时分析每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
1003150
  1. [Mysql] 业务数据 - 新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150

加粗为更新/新增数据

  1. [大数据平台] - 此时每个客户的销售情况,如下:
客户ID销售数量总计销售额总计
10018400
10026240
10032100
1004150

加粗为更新/新增数据

根据上述需求,我们可以得出需要构建实时切片表以满足业务数据的实时分析需求。

切片表也叫维度表,是根据基础表(事实表)某个维度或多个维度对事实数据进行汇总计算,并展示为一个交叉分析的表格。与事实表相比,切片表的数据更加聚合,只包含某些维度或者满足某些特定条件的数据。

二、技术架构

为了实现上述需求,我们可以利用实时同步任务将业务数据实时同步至下游的 MPP(Massively Parallel Processing)库,从而构建切片表。结合市场上常见的技术组件,本文选择了实时引擎 FlinkCDC 和 Doris(MPP)库作为实时同步技术架构。整体架构如下:

在这里插入图片描述

三、设计方案

从背景需求不难看出只需实现切片表即可满足需求,但是在flink + Mpp库中却可以有多种方案,可分为三种,具体如下:

3.1、FlinkCDC + FlinkSQL状态计算

该方案利用了FlinkCDC实时捕获业务数据,并在Flink内部进行有状态的计算,例如聚合查询等操作。这种方法依赖于Checkpoint分布式快照,确保精确一次性的处理。最终,计算得到的聚合结果会实时地下沉到下游MPP库中,使业务人员能够直接查询切片表数据。示例如下:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);-- flink sql 
insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;
  • 优点:
    • 实现了实时捕获和处理业务数据,保证了数据的准确性和实时性。
    • 利用了Flink的状态计算能力,使得处理逻辑更加灵活和高效。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.2、FlinkCDC + Doris Aggregate 模型

这种方法利用了Doris Aggregate聚合模型,实现了切片表的功能。在Doris Aggregate聚合模型中,数据会在每批次导入时进行内部聚合,从而无需上游有状态计算。只需将聚合后的数据下沉至Doris数据库即可。

以下是一个示例的Doris建表语句:

-- Doris aggregate 建表语句
CREATE TABLE IF NOT EXISTS example_db.example_order_agg
(`客户ID` LARGEINT NOT NULL COMMENT "客户ID",`销售数量总计` BIGINT SUM DEFAULT "0" COMMENT "销售数量总计",`销售额总计` BIGINT SUM DEFAULT "0" COMMENT "销售额总计"
)
AGGREGATE KEY(`客户ID`)
DISTRIBUTED BY HASH(`客户ID`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);

更多信息:Doris Aggregate 模型

  • 优点:
    • 通过FlinkCDC实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 利用了Doris aggregate模型进行聚合查询,将聚合压力下沉至下游。
  • 缺点:
    • 下游Doris表结构固定,无法灵活满足用户对不同维度的查询需求。

3.3、FlinkCDC + 实时表 + OLAP查询

这种方案充分利用了Doris的OLAP能力,只需建立一个实时表,业务人员便可根据需要自定义查询语句进行查询。

以下是一个示例的实现:

-- flink cdc 读取订单表
create table mysql_order( 
# ...
) WITH ( 
# ...
);-- flink sql doris 
create table doris_order( 
# ...
) WITH ( 
# ...
);-- flink sql 实时同步
insert into doris_order select * from mysql_order;-- 业务人员查询
select 客户ID, sum(销售数量), sum(销售额) from doris_order group by 客户ID;

对于实时表的具体实现,可参考笔者另一篇文章:Flink实时数仓同步:实时表实战详解

  • 优点:
    • 利用 FlinkCDC 实现了实时捕获和处理业务数据,确保了数据的准确性和实时性。
    • 借助 Doris 的 OLAP查询能力,将聚合压力下沉至下游,提高了系统的性能和稳定性。
    • 无需固定 Doris 表结构,可以灵活满足用户对不同维度的查询需求。
  • 缺点:
    • 当数据量巨大时可能存在一定查询延迟问题。
    • 可能存在并发查询效率降低问题,需要合理规划和调整查询策略。

3.4、总结

针对不同的需求场景,我们需要选择最合适的实现方案。通常情况下,对于固定的聚合查询需求,比如定期汇总统计,FlinkCDC + Doris Aggregate 模型FlinkCDC + FlinkSQL状态计算 是更为合适的选择。而对于需要更灵活查询的情况,FlinkCDC + 实时表 则更加适用。

然而,最终的选择取决于具体的业务需求和场景特点。结合以上几种实现设计,笔者更倾向于 FlinkCDC + 实时表 这种方式。我已经在另一篇博客中详细描述了该实现方式:Flink实时数仓同步:实时表实战详解。

故本文将采用FlinkCDC + FlinkSQL有状态计算实现设计,旨在给读者带来不同的体验。

四、实现方式

设计方案确定后我们还需要考虑实现方式,FlinkCDC 提供了三种实现方式,具体如下:

  1. Flink run jar 模式: 这种模式适用于处理复杂的流数据。当使用简单的 Flink SQL 无法满足复杂业务需求时(例如拉链表等),可以通过编写自定义逻辑的方式,将其打包成 Jar 包并运行。以下是一个示例:
// 示例代码
public class MySqlSourceExample {public static void main(String[] args) throws Exception {// 配置数据源和处理逻辑...// 实时任务启动env.execute("Print MySQL Snapshot + Binlog");}
}

更多信息:MysqlCDC connector

  1. sql脚本模式: bin/sql-client -f file ,这种模式适用于简单的流水任务,例如实时表同步等简单的 ETL 任务。你可以通过编写 SQL 文件并使用 Flink SQL 客户端执行,而无需编写额外的 Java 代码。以下是一个示例:
-- 示例 mysql2doris SQL 文件
set 'execution.checkpointing.interval'='30000';create table mysql_order(
# ...
) WITH ( 
# ...
);create table doris_order( 
# ...
) WITH ( 
# ...
);insert into doris_order select 客户ID, sum(销售数量), sum(销售额) from mysql_order group by 客户ID;

执行如下:

$> bin/sql-client.sh --file /usr/local/flinksql/mysql2doris

更多信息:FlinkSQL 客户端

  1. FlinkCDC Pipeline: 这是 FlinkCDC 3.0 版本引入的全新功能,旨在通过简单的配置即可实现数据同步,无需编写复杂的 Flink SQL。缺点是需要使用 Flink 版本 1.16 或更高版本。以下是一个示例:
# 示例配置文件
source:type: mysqlname: MySQL Sourcehostname: 127.0.0.1port: 3306username: adminpassword: passtables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*server-id: 5401-5404sink:type: dorisname: Doris Sinkfenodes: 127.0.0.1:8030username: rootpassword: passpipeline:name: MySQL to Doris Pipelineparallelism: 4

执行如下:

$> bin/flink-cdc.sh mysql-to-doris.yaml

更多信息:FlinkCDC Pipeline

这三种方式各有优劣,可以根据具体需求和场景选择合适的实现方式。考虑到前几篇 Flink 实时数仓同步相关博客都采用了 Jar 包形式,为了给读者带来不同的体验,本文采用 sql脚本模式 模式来实现背景需求。

五、FlinkCDC + FlinkSQL状态计算实现

5.1、Doris切片表设计

由于FlinkSQL完成聚合计算,因此在Doris中设计表结构时采用了Unique数据模型。建表语句如下:

CREATE TABLE `example_order_slice`
(`user_id` INT NOT NULL COMMENT '客户id',`sale_count` BIGINT NULL COMMENT '销售数量总计',`sale_total` BIGINT NULL COMMENT '销售金额总计'
) ENGINE=OLAP
UNIQUE KEY(`user_id`)
COMMENT '订单切片表'
DISTRIBUTED BY HASH(user_id) BUCKETS AUTO;

关于mysql type 转换 doris type 可参考 Doris 源码内置转换工具

5.2、实时同步逻辑

  1. 首先,由于实时流水表同步使用Flink-cdc读取关系型数据库,flink-cdc提供了四种模式: “initial”,“earliest-offset”,“latest-offset”,“specific-offset” 和 “timestamp”。本文使用的Flink-connector-mysq是2.3版本,这里简单介绍一下这四种模式:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  2. 本文采用initial模式同步任务

  3. 编写mysql2doris SQL文件,这里需要注意的是类型转换:由于 mysql2doris 是 Flink SQL 文件,故需要将 mysql type -> flink type 以及 doris type -> flink type,示例如下:

set 'execution.checkpointing.interval'='30000';
set 'state.checkpoints.dir'='file:///home/finloan/flink-1.16.1/checkpoint/mysql2doris';create table mysql_order(`id` INT,`user_id` INT,`sale_id` INT,`sale_time` TIMESTAMP(0),`sale_quantity` BIGINT,`sales_volume` BIGINT,PRIMARY KEY(id) NOT ENFORCED
) WITH ('connector'='mysql-cdc','hostname'='10.185.163.177','port' = '80','username'='rouser','password'='123456','database-name' = 'database','table-name'='table'
);create table doris_order(`user_id` INT,`sale_count` BIGINT,`sale_total` BIGINT
) WITH ('password'='password','connector'='doris','fenodes'='11.113.208.103:8030','table.identifier'='database.table','sink.label-prefix'='任务唯一标识,每次启动都要更换','username'='username'
);insert into doris_order select user_id, sum(sale_quantity), sum(sales_volume) from mysql_order group by user_id;

类型转换参考:

Doris & Flink Column Type Mapping

Mysql CDC Data Type Mapping

  1. 执行命令如下:此时任务已经提交到flink 集群,本文中使用的是Flink-Cluster 模式而非yarn模式
$> ./sql-client.sh -f  ~/mysql2dorisFlink SQL> [INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5c683fba8567e65509870a6db4e99fa5
  1. 登录flinkUi界面查看任务,如下所示:

在这里插入图片描述

  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
1003150
  1. [Mysql]-业务数据新增了两条订单数据,如下:
订单ID客户ID产品ID销售日期销售数量销售额
1100120012022-01-013150
2100220022022-01-02280
3100320012022-01-03150
4100120032022-01-045250
5100220022022-01-054160
6100320012022-01-06150
7100420012022-01-06150
  1. 此时Doris 切片表数据如下:
user_idsale_countsale_total
10018400
10026240
10032100
1004150

六、总结

本文详细介绍了实时数仓同步中切片表的设计与实现。首先,分析了业务背景和需求,说明了切片表的作用和必要性。然后,介绍了基于 FlinkCDC 和 Doris 的技术架构,并比较了不同的设计方案。针对不同的需求场景,提出了三种具体的实现方案:FlinkCDC + FlinkSQL状态计算、FlinkCDC + Doris Aggregate 模型以及 FlinkCDC + 实时表,并分析了它们的优缺点。最后,为了给读者带来不同体验选择了 FlinkCDC + FlinkSQL状态计算 方案进行实现,并详细介绍了实时同步逻辑和相关的技术细节。

通过本文的阅读,读者可以了解到实时数仓同步中切片表的设计与实现方法,以及不同方案的选择和比较。同时,本文还提供了相关资料和参考链接,方便读者进一步深入学习和研究。

七、相关资料

  • Flink实时数仓同步:实时表实战详解
  • Doris Aggregate 模型
  • Flink Doris Connector
  • FlinkCDC Pipeline
  • FlinkSQL 客户端
  • Flink Run jar 模式
  • Doris 源码内置转换工具
  • Doris & Flink Column Type Mapping
  • Mysql CDC Data Type Mapping

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/518513.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openEuler全球生态合作研讨会:共话全球技术创新,共建国际产业生态

2024年2月27日,OpenAtom openEuler(简称“openEuler”)全球生态合作研讨会在西班牙巴塞罗那成功举办。开放原子开源基金会副秘书长辛晓华先生,开放原子开源基金会开源安全委员会副主席任旭东先生,Eclipse基金会首席会员…

基于AFDPF主动频率偏移法的孤岛检测Simulink仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于AFDPF主动频率偏移法的孤岛检测Simulink仿真。 2.系统仿真结果 3.核心程序与模型 版本:MATLAB2022a 36 4.系统原理简介 在分布式发电系统中,孤…

JVM-整体结构原理深度解析

JVM定义 JVM是Java Virtual Machine(Java虚拟机)的缩写,JVM是一种用于计算设备的规范,它是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟各种计算机功能来实现的。 引入Java语言虚拟机后,Java语言在…

Android14 Handle机制

Handle是进程内部, 线程之间的通信机制. handle主要接受子线程发送的数据, 并用此数据配合主线程更新UI handle可以分发Message对象和Runnable对象到主线程中, 每个handle实例, 都会绑定到创建他的线程中, 它有两个作用,: (1) 安排消息在某个主线程中某个地方执行 (2) 安排…

从安卓转战月薪6万的鸿蒙原来这么简单

近年来,各家大厂正在积极布局鸿蒙客户端开发,鸿蒙操作系统备受瞩目,不少安卓开发者纷纷转战鸿蒙,并取得了可观的经济回报。本文将为大家揭示,从安卓转战鸿蒙并获得月薪6万的简单之道,希望能给正在考虑转型的…

【center-loss 中心损失函数】 参数与应用

文章目录 前言简单总结一下参数对比解释参数权重衰减(L2正则化)动量其他参数运行 前言 之前我们已经完全弄明白了中心损失函数里的代码是什么意思,并且怎么用的了,现在我们来运行它。 论文:https://ydwen.github.io/…

基于Springboot免费搭载轻量级阿里云OSS数据存储库(将本地文本、照片、视频、音频等上传云服务保存)

一、注册阿里云账户 打开https://www.aliyun.com/,申请阿里云账户并完成实名认证(个人)。这种情况就是完成了: 二、开通OSS服务 点击立即开通即可。 三、创建Bucket 申请id和secert: 进去创建一个Accesskey就会出现以…

Linux 实现打印彩色进度条

文章目录 预备知识一、理解回车换行二、认识行缓冲1、代码一、二(回车换行理解)2、代码三、四(sleep函数和ffush函数理解) 三、简单倒计时1. 倒计时代码2、效果展示 四、进度条1、效果展示2、进度条代码makefileProcessBar.hProce…

前端如何上传图片给后台?如何传递 multipart/form-data 类型的数据?图片大小、格式检查?

1. 如何上传图片? 图片上传需要传二进制流,请求头的 content-type 类型需为 multipart/form-data,传递的格式如下图所示 前后端交互通常为: 先调用接口上传二进制流图片然后再上传表单其他内容(第一步通常会返回后台…

从 HPC 到 AI:探索文件系统的发展及性能评估

随着 AI 技术的迅速发展,模型规模和复杂度以及待处理数据量都在急剧上升,这些趋势使得高性能计算(HPC)变得越来越必要。HPC 通过集成强大的计算资源,比如 GPU 和 CPU 集群,提供了处理和分析大规模数据所需的…

Canvas笔记04:绘制九大基本图形的方法,重头戏是贝塞尔曲线

hello,我是贝格前端工场,最近在学习canvas,分享一些canvas的一些知识点笔记,本期分享canvas绘制图形的知识,欢迎老铁们一同学习,欢迎关注,如有前端项目可以私信贝格。 Canvas是HTML5中的一个绘…

1分钟帮你快速搞定遥测终端机RTU选型!

遥测终端机RTU-MGTR-W系列 精准应对贴心服务每一个关键场景 MGTR-W系列遥测终端机在水利水务领域有着广泛的应用,能够满足各种细分场景的需求。该系列终端机助力实现灌区信息化、高标准农田建设、农村供水信息化、水库雨水情监测、大坝安全监测、地下水监测以及水…