Flink电商实时数仓(三)

DIM层代码流程图

维度层的重点和难点在于实时电商数仓需要的维度信息一般是动态的变化的,并且由于实时数仓一般需要一直运行,无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据,因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表,实时动态的发布广播信息。主流数据根据广播数据及时调整处理逻辑,并自动在HBase中创建相应的维度表和写入相应的维度数据。

  1. 消费Kafka ods业务主题数据
  2. 数据清洗:是否为JSON格式
  3. 使用flink-cdc读取监控配置表数据
  4. 在HBase中创建维度表
  5. 做成广播流
  6. 连接主流和广播流
  7. 筛选出需要写出的字段
  8. 写出到Hbase

在这里插入图片描述

整体架构

  • realtime-common模块
    • base: 所有Flink程序的基类,负责搭建Flink运行环境和设置并行度和检查点等相关参数。其中我们的数据来源也确定为Kafka,故数据源代码也写在这里。每个Flink程序的具体处理逻辑由handle()函数来负责处理。
    • bean:负责存放项目运行过程中需要用到的bean对象,比如当前flink-cdc程序中需要用到的TableProcessDim类,配置信息表对象。
    • constant:负责存放程序中需要使用到常量参数
    • function:负责存放一些通用的函数方法
    • util:一般存放和数据连接相关的工具类
    • test目录: 用来在写正式代码前测试连接是否通畅,数据是否可以正常发送。
  • realtime-dim模块
    • app:DimApp里面写的是dim层的具体实现,具体步骤如上述流程图所示。
    • function:负责存放数据处理的实现类,一般会继承相应的父类,在dim层可以直接调用这里的子类来实现父类接口,让dim层的代码逻辑更加清晰。
  • realtime-dwd模块:如上
  • realtime-dws模块:如上

在这里插入图片描述

数据清洗ETL

数据清洗,简单来说就是对数据进行简单的转换筛选。首先如果在转换过程中出现异常,直接过滤掉。注意这里无需抛出异常,因为如果throw a exception会导致整个程序异常终止,而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的,只需将异常信息打印到控制台即可。如果转换正常,再判断是否满足以下三个条件:

  1. 数据库名为gmall
  2. 数据类型不是bootstrap-start或者bootstrap-complete
  3. data字段不是null且长度不为0

Flink-cdc读取配置表的数据

Flink中获取数据主要有两个步骤:

  1. 获取相应的数据源Source
    • 注意:在构建Flink-cdc对应的MySQLSource时,tableList参数必须是库表.表名结构
  2. 调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流,在该方法中可以设置数据的水位线。
  3. 获取到数据后,建议先打印到控制台查看数据的具体结构。
  4. 注意读取配置信息表的并发度必须设置为1;如果不为1,只能读取r操作数据,其他更新数据无法读取。
public static MySqlSource<String> getMySqlSource(String databaseName, String tableName){MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList(databaseName) // set captured database.tableList(databaseName+"."+tableName) // set captured table.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String.startupOptions(StartupOptions.initial()).build();return mySqlSource;}

在HBase中创建维度表

数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串,这里根据数据的四种类型op在HBase中进行不同的建表删表操作,同时对数json字符数据进行转换映射处理,转换为对应的bean对象数据流。这里一个数据产生一个处理后的对象,故使用Map算子或FlatMap算子都可以。

  • op类型
    • d 代表delete,需要删除before字段中对应的表
    • c 代表create,r 代表 read,需要创建after字段中对应的表
    • u 代表update,需要先删除掉旧表,然后根据新表的字段创建一个新表
  • 创建HBase连接,创建连接是很耗费资源的行为,因此新建连接和关闭连接需要写在open和close方法中
  • HBase中想要对表进行创建和删除等DDL操作,都由Admin对象管理;如果需要对数据进行插入删除等DML操作,需要创建Table对象。详细操作细节请看相应代码即可。
public static SingleOutputStreamOperator<TableProcessDim> createHbaseTable(DataStreamSource<String> mysqlSource) {SingleOutputStreamOperator<TableProcessDim> createHBaseTable = mysqlSource.flatMap(new RichFlatMapFunction<String, TableProcessDim>() {public Connection connection ;@Overridepublic void open(Configuration parameters) throws Exception {//获取连接connection = HBaseUtil.getHBaseConnection();}@Overridepublic void close() throws Exception {//关闭连接HBaseUtil.closeHBaseConn(connection);}@Overridepublic void flatMap(String s, Collector<TableProcessDim> out){//使用读取的配置表数据,到HBase中创建与之对应的表格try {JSONObject jsonObject = JSONObject.parseObject(s);String op = jsonObject.getString("op");TableProcessDim dim;//维度表if ("d".equals(op)) {dim = jsonObject.getObject("before", TableProcessDim.class);dim.setOp(op);//当配置表发送一个D类型的数据,对应的HBase需要删除一张维度表deleteTable(dim);} else if ("c".equals(op) || "r".equals(op)) {dim = jsonObject.getObject("after", TableProcessDim.class);createTable(dim);dim.setOp(op);} else {//op = 'u', 即修改dim = jsonObject.getObject("after", TableProcessDim.class);deleteTable(dim);createTable(dim);}dim.setOp(op);out.collect(dim);} catch (Exception e) {e.printStackTrace();}}private void createTable(TableProcessDim dim) {String sinkFamily = dim.getSinkFamily();String[] split = sinkFamily.split(",");try {HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);} catch (IOException e) {e.printStackTrace();}}private void deleteTable(TableProcessDim dim) {try {HBaseUtil.dropHBaseTable(connection, Constant.HBASE_NAMESPACE, dim.getSinkTable());} catch (IOException e) {e.printStackTrace();}}});return createHBaseTable;}

主流连接广播流

从Flink-cdc获取的数据(gmall2023_config)是作为一个参数来控制我们对于主流即ODS层数据(gmall数据库的业务数据)的处理逻辑。gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息,并且定义了表中有哪些字段。

  1. 转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法
  2. 使用的主流(gmall业务数据)的connect方法,得到一个连接流,然后对连接流进行process处理。
  3. 创建BroadcastProcessFunction,在里面分别有两个函数
    • processBroadcastElement():处理广播流数据
    • processElement():处理主流数据
  4. 广播流处理逻辑:
    • 读取广播状态
    • 将配置表信息写到广播状态中
    • 根据广播状态数据的op对状态做相应的修改
  5. 主流处理逻辑:
    • 查询广播状态,判断当前数据对应的表是否存在于状态中
    • 如果数据比状态来的更早,造成状态为空,需要对状态做预处理(提前从mysql中读取维表配置表信息)
    • 如果根据当前表的表名查询的状态不为空,说明该表为维度数据,使用收集器收集起来。

筛选出需要的字段

在这里插入图片描述
在维度配置信息表中的sink_column字段里定义了维度表需要的字段,使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。

写出到Hbase

过滤后的数据流调用它的addSink方法,方法中需要传入一个SinkFunction接口类。该接口需要实现三个方法分别是:

  • open方法:获取HBase连接
  • close方法:关闭HBase连接
  • invoke方法:写入数据时调用的方法,根据jsonObj中的type做不同处理,如果是delete,需要删除对应的维度表数据;否则都是直接覆盖写入。

代码的Gitee仓库地址:https://gitee.com/langpaian/gmall2023-realtime.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/292442.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

连获4大奖项,欧科云链成Web3行业领跑者

2023年底&#xff0c;作为深耕区块链大数据且持续关注监管与合规科技的Web3企业&#xff0c;欧科云链连续斩获4大奖项&#xff0c;包括第十三届中国证券金紫荆奖、财联社“最具创新价值奖” 、界面新闻“优致雇主奖”与“好公司50”&#xff0c;成为中国Web3领域中最受关注的企…

深入浅出堆排序: 高效算法背后的原理与性能

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《linux深造日志》 《高效算法》 ⛺️生活的理想&#xff0c;就是为了理想的生活! &#x1f4cb; 前言 &#x1f308;堆排序一个基于二叉堆数据结构的排序算法&#xff0c;其稳定性和排序效率在八大排序中也…

TypeScript实战——ChatGPT前端自适应手机端,PC端

前言 「作者主页」&#xff1a;雪碧有白泡泡 「个人网站」&#xff1a;雪碧的个人网站 可以在线体验哦&#xff1a;体验地址 文章目录 前言引言先看效果PC端手机端 实现原理解释 包的架构目录 引言 ChatGPT是由OpenAI开发的一种基于语言模型的对话系统。它是GPT&#xff08;…

php学习02-php标记风格

<?php echo "这是xml格式风格" ?><script language"php">echo 脚本风格标记 </script><% echo "这是asp格式风格" %>推荐使用xml格式风格 如果要使用简短风格和ASP风格&#xff0c;需要在php.ini中对其进行配置&#…

MySQL数据库 索引

目录 索引概述 索引结构 二叉树 B-Tree BTree Hash 索引分类 索引语法 慢查询日志 索引概述 索引 (index&#xff09;是帮助MySQL高效获取数据的数据结构(有序)。在数据之外&#xff0c;数据库系统还维护着满足特定查找算法的数据结构&#xff0c;这些数据结构以某种…

Leetcod面试经典150题刷题记录 —— 矩阵篇

矩阵篇 1. 有效的数独2. 螺旋矩阵Python 3. 旋转图像Python额外开辟数组空间原地置换法 4. 矩阵置零5. 生命游戏Python 1. 有效的数独 题目链接&#xff1a;有效的数独 - leetcode 题目描述&#xff1a; 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验…

本地开发了一个项目,将其添加到 Git 仓库

背景 如果你已经在本地开发了一个项目&#xff0c;但尚未将其添加到 Git 仓库 现在要将其添加至远程git仓库 文章目录 详细步骤在gitee上申请仓库 详细步骤 打开终端&#xff1a; 打开终端或命令提示符窗口&#xff0c;进入你的项目所在的目录。 初始化 Git 仓库&#xff1a; …

27 redis 的 sentinel 集群

前言 redis 的哨兵的相关业务功能的实现 哨兵的主要作用是 检测 redis 主从集群中的 master 是否挂掉, 单个哨兵节点识别 master 下线为主管下线, 超过 quorum 个 哨兵节点 认为 master 挂掉, 识别为 客观下线 然后做 failover 的相关处理, 重新选举 master 节点 我们这里…

陶建辉在 CIAS 2023 谈“新能源汽车的数字化”

近年&#xff0c;中国的新能源汽车发展迅猛&#xff0c;在全球竞争中表现出色&#xff0c;已经连续 8 年保持全球销量第一。在新兴技术的推动下&#xff0c;新能源汽车的数字化转型也正在加速进行&#xff0c;从汽车制造到能源利用、人机交互&#xff0c;各个环节都在进行数字化…

Android笔记(二十):JetPack DataStore 之 Proto DataStore

Jetpack DataStore 是一种数据存储解决方案&#xff0c;主要适用于小型数据的处理。它可以通过协议缓冲区存储键值对或类型化对象。DataStore 使用 Kotlin 协程和 Flow 以异步、一致的事务方式存储数据。DataStore有两种实现方式&#xff08;1&#xff09;Preferences DataStor…

“智”绘出海新航道,亚马逊云科技携手涂鸦智能助力智能家居企业全球化

随着人工智能、5G等技术的快速发展&#xff0c;智能家居行业呈现高速发展的态势。Statista数据显示&#xff0c;2022年全球智能家居行业支出总值为1145亿美元&#xff0c;欧美地区以较早的智能家居普及率&#xff0c;率先进入全屋智能时代&#xff0c;其中欧盟区国家家用智能设…

simulink代码生成(一)——环境搭建

一、安装C2000的嵌入式环境&#xff1b; 点击matlab附加功能&#xff0c; 然后搜索C2000&#xff0c;安装嵌入式硬件支持包&#xff1b;点击安装即可&#xff1b;&#xff08;目前还不知道破解版的怎么操作&#xff0c;目前我用的是正版的这样&#xff0c;完全破解的可能操作…