Flink电商实时数仓(六)

交易域支付成功事务事实表

  1. 从topic_db业务数据中筛选支付成功的数据
  2. 从dwd_trade_order_detail主题中读取订单事实数据、LookUp字典表
  3. 关联三张表形成支付成功宽表
  4. 写入 Kafka 支付成功主题
执行步骤
  1. 设置ttl,通过Interval join实现左右流的状态管理
  2. 获取下单明细数据:用户必然要先下单才有可能支付成功,因此支付成功明细数据集必然是订单明细数据集的子集。要注意:Interval Join要求表中均为Append数据,即“只能新增,不能修改”,订单明细表数据生成过程中用到了left join,生成了回撤流,看似不满足Interval Join的条件。但是,回撤数据进入Kafka会以null值形式存在,如果用Kafka Connector将订单明细封装为动态表,null值会被过滤,最终得到的是相同主键存在重复数据的Append流(动态表本质上就是流),满足Interval Join的条件。
    • Interval join只支持事件时间,因此数据必须携带水位线;建表时水位线的相关语法为 water for order_time as order_time - interval '5' second,这里要求数据是timestamp(3)
    • 原有的时间数据类型是bigint类型的ts,使用row_time as TO_TIMESTAMP_LTZ(ts,3)这个函数即可将原有的时间数据转换为水位线所需的数据类型
  3. 筛选支付数据:
    • 支付状态为支付成功
    • 操作类型为update
  4. 构建 LookUp 字典表
  5. 联上述三张表形成支付成功宽表,写入 Kafka 支付成功主题

核心代码如下

 public void handle(StreamExecutionEnvironment env, TableEnvironment tableEnv, String groupId) {//核心业务逻辑//1. 读取TopicDB主题数据createTopicDb(groupId,tableEnv);//2. 筛选支付成功的数据,从业务数据topic_db中filterPaymentTable(tableEnv);//3. 读取下单详情表数据, 从kafka读取数据createOrderDetailTable(tableEnv, groupId);//4. 创建base.dic字典表,从HBase维度数据中读取createBaseDic(tableEnv);//tableEnv.executeSql("select * from order_detail").print();//tableEnv.executeSql("select * from base_dic").print();//tableEnv.executeSql("select to_timestamp_ltz(ts,3) from order_detail");//5. 使用interval join 完成支付成功流和订单详情数据关联intervalJoin(tableEnv);//6. 使用lookup join完成维度退化Table resultTable = lookupJoin(tableEnv);//7. 创建upsert kafka连接器写出createKafkaSink(tableEnv);resultTable.insertInto(Constant.TOPIC_DWD_TRADE_ORDER_PAYMENT_SUCCESS).execute();}

事实表动态分流

在这里插入图片描述

dwd层其他的事实表都是从topic_db中去业务数据库一张表的变更数据,按照某些过滤后写入kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。有点类似我们前面实现DIM层的动态配置。

  1. 清洗过滤和转换:判断是否满足json格式,如果满足转换为jsonObj对象
  2. 读取配置表数据,使用flink-cdc读取
  3. 转换数据格式,转换到对应bean对象中
  4. 配置信息广播话,然后跟主流数据进行连接
  5. 筛选出需要的字段
  6. 根据表中的sink table字段来动态写出到对应的kafka主题中

核心代码如下

public static void main(String[] args) {new DwdBaseDb().start(10019, 4, "dwd_base_db", Constant.TOPIC_DB);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//核心业务逻辑//1. 读取topic_db数据//stream.print();//2. 清洗过滤和转换, jsonObjStream是主流数据SingleOutputStreamOperator<JSONObject> jsonObjStream = filterJson(stream);//jsonObjStream.print();//3. 读取配置表数据,使用flink-cdc读取,读取配置文件时并发度最好为1DataStreamSource<String> tableProcessDwd = getTableProcessDwd(env);//tableProcessDwd.print();4. 转换数据格式 string -> TableProcessDwd -> broadcastStream,广播流数据SingleOutputStreamOperator<TableProcessDwd> processDwdStream = getProcessDwdStream(tableProcessDwd);MapStateDescriptor<String, TableProcessDwd> mapStateDescriptor = new MapStateDescriptor<>("process_state", String.class, TableProcessDwd.class);BroadcastStream<TableProcessDwd> broadcastStream = processDwdStream.broadcast(mapStateDescriptor);//5. 连接主流和广播流,对主流数据进行判断是否需要保留SingleOutputStreamOperator<Tuple2<JSONObject, TableProcessDwd>> processStream = processBaseDb(jsonObjStream, broadcastStream, mapStateDescriptor);//processStream.print();//6. 筛选最后需要写出的字段SingleOutputStreamOperator<JSONObject> dataStream = filterColumns(processStream);//7. 通过sink_table的表名来动态写出到对应kafka主题//在setRecordSerializer()设置dataStream.sinkTo(FlinkSinkUtil.getKafkaSinkWithTopicName());}

gitee地址 :https://gitee.com/langpaian/gmall2023-realtime

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/300125.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[内功修炼]函数栈帧的创建与销毁

文章目录 1:什么是函数栈帧2:理解函数栈帧能解决什么问题呢3:函数栈帧的创建与销毁的解析3.1:什么是栈3.2:认识相关寄存器与汇编指令相关寄存器相关汇编指令 3.3 解析函数栈帧的创建和销毁3.3.1 预备知识3.3.2 详细解析一:调用main函数,为main函数开辟函数栈帧First:push前push…

麦肯锡产品经理问题解决流程终极指南

您是否想知道世界上最成功的产品经理如何始终如一地提供不仅满足而且超出预期的解决方案&#xff1f;秘密可能就在于世界上最负盛名的咨询公司之一麦肯锡公司所磨练的方法论。本文深入探讨了麦肯锡的问题解决流程&#xff0c;该流程专为希望提升水平的产品经理量身定制。 01. 麦…

NET中使用SQLSugar操作sqlserver数据库

目录 一、SqlSugar是什么&#xff1f; 二、迁移和建表 1.建立实体 2.创建上下文类 3.在Program中添加SqlSugar服务 4.在控制器中注入上下文类 三、简单实现CURD功能 总结 一、SqlSugar是什么&#xff1f; SqlSugar是一款老牌 .NET 开源ORM框架。 主要特点&#xff1a…

致远互联FE协作办公平台 editflow_manager.jsp SQL注入漏洞

漏洞描述 致远互联FE协作办公平台是一款为企业提供全方位协同办公解决方案的产品。它集成了多个功能模块&#xff0c;旨在帮助企业实现高效的团队协作、信息共享和文档管理。致远互联FE协作办公平台editflow_manager存在sql注入漏洞&#xff0c;攻击者可以获得敏感信息。 资产…

LeetCode-环形链表问题

1.环形链表&#xff08;141&#xff09; 题目描述&#xff1a; 给你一个链表的头节点 head &#xff0c;判断链表中是否有环。 如果链表中有某个节点&#xff0c;可以通过连续跟踪 next 指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的环&#xff0c;评测系统…

三叠云工程劳务管理,优化建筑施工管理,提升效率与质量

随着建筑行业的蓬勃发展&#xff0c;工程施工现场管理变得愈发复杂。传统的人员管理方式已经无法满足企业快速发展的需求。如何提高施工效率、优化人力资源管理成为了建筑企业亟待解决的问题。逐渐走向数字化的工程建设行业&#xff0c;急需一种足以匹配这一时代变革、高效管理…

渗透测试——1.3计算机网络基础

一、黑客术语 1、肉鸡&#xff1a;被黑客攻击电脑&#xff0c;可以受黑客控制不被发现 2、端口&#xff08;port&#xff09;&#xff1a;数据传输的通道 3、弱口令&#xff1a;强度不高&#xff0c;容易被猜到的口令、密码 4、客户端&#xff1a;请求申请电脑&#xff08;…

SQL实践篇(二):为什么微信用SQLite存储聊天记录?

文章目录 简介什么是SQLite在python中使用SQLite通过SQLite查询微信的聊天记录参考文献 简介 SQLite是一个嵌入式的开源数据库引擎&#xff0c;大小只有3M左右&#xff0c;因此我们可以将整个SQLite嵌入到应用中&#xff0c;而不再需要采用传统的客户端/服务器&#xff08;CS&…

《试题与研究》期刊发表投稿方式

《试题与研究》杂志是面向全国公开发行的国家CN级权威教育期刊。创刊以来一直以服务教育服务学生为办刊宗旨&#xff0c;以优秀的内容质量和编校质量深受广大读者好评&#xff0c;其权威性、导向性、针对性、实用性在全国教育期刊中独树一帜。为推动教育科研事业的发展&#xf…

【K8S in Action】服务:让客户端发现pod 并与之通信(2)

一 通过Ingress暴露服务 Ingress (名词&#xff09; 一一进入或进入的行为&#xff1b;进入的权利&#xff1b;进入的手段或地点&#xff1b;入口。一个重要的原因是每个 LoadBalancer 服务都需要自己的负载均衡器&#xff0c; 以及 独有的公有 IP 地址&#xff0c; 而 Ingres…

CentOS进入单用户模式

一、重启 二、出现内核选项 按“e” 三、编辑这一行 输入 rw init/sysroot/bin/sh 四、进入单用户模式 ctrlx 进入 五、切换目录 chroot /sysroot 六、然后你就操作你的系统了。 修改密码等等

httprunner环境变量

前言 我的上一篇文章讲了httprunner的基本介绍&#xff0c;这篇文章呢主要来给大家介绍httprunner中的环境变量。 一般来说&#xff0c;在进行实际应用的开发过程中&#xff0c;应用会拥有不同的运行环境&#xff0c;通常会有以下环境&#xff1a; 本地开发环境 测试环境 生产…