尚硅谷大数据项目《在线教育之实时数仓》笔记007

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第9章 数仓开发之DWD层

P053

P054

P055

P056

P057

P058

P059

P060

P061

P062

P063

P064

P065


第9章 数仓开发之DWD层

P053

9.6 用户域用户注册事务事实表
9.6.1 主要任务

读取用户表数据,读取页面日志数据,关联两张表补全用户注册操作的维度信息,写入 Kafka 用户注册主题。

P054

9.6.4 代码

Kafka | Apache Flink

 

P055

//TODO 4 读取page主题数据dwd_traffic_page_log
//TODO 5 过滤用户表数据
//TODO 6 过滤注册日志的维度信息

P056

package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-23 17:36*/
public class DwdUserUserRegister {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(4);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表的TTL
//        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10L));EnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 读取topic_db的数据String groupId = "dwd_user_user_register2";KafkaUtil.createTopicDb(tableEnv, groupId);
//        tableEnv.executeSql("select * from topic_db").print();//TODO 4 读取page主题数据dwd_traffic_page_logtableEnv.executeSql("CREATE TABLE page_log (\n" +"  `common` map<string,string>,\n" +"  `page` string,\n" +"  `ts` string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤用户表数据Table userRegister = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'],'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='user_info'\n" +"and `type`='insert'" +"");tableEnv.createTemporaryView("user_register", userRegister);//TODO 6 过滤注册日志的维度信息Table dimLog = tableEnv.sqlQuery("select \n" +"    common['uid'] user_id,\n" +"    common['ch'] channel, \n" +"    common['ar'] province_id, \n" +"    common['vc'] version_code, \n" +"    common['sc'] source_id, \n" +"    common['mid'] mid_id, \n" +"    common['ba'] brand, \n" +"    common['md'] model, \n" +"    common['os'] operate_system \n" +"from page_log\n" +"where common['uid'] is not null \n"//"and page['page_id'] = 'register'");tableEnv.createTemporaryView("dim_log", dimLog);//TODO 7 join两张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    ur.id user_id,\n" +"    create_time register_time,\n" +"    create_date register_date,\n" +"    channel,\n" +"    province_id,\n" +"    version_code,\n" +"    source_id,\n" +"    mid_id,\n" +"    brand,\n" +"    model,\n" +"    operate_system,\n" +"    ts, \n" +"    current_row_timestamp() row_op_ts \n" +"from user_register ur \n" +"left join dim_log pl \n" +"on ur.id=pl.user_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 8 写出数据到kafkatableEnv.executeSql(" create table dwd_user_user_register(\n" +"    user_id string,\n" +"    register_time string,\n" +"    register_date string,\n" +"    channel string,\n" +"    province_id string,\n" +"    version_code string,\n" +"    source_id string,\n" +"    mid_id string,\n" +"    brand string,\n" +"    model string,\n" +"    operate_system string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    PRIMARY KEY (user_id) NOT ENFORCED\n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_user_user_register"));tableEnv.executeSql("insert into dwd_user_user_register " +"select * from result_table");}
}

P057

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_user_user_register

P058

9.7 交易域下单事务事实表
9.7.1 主要任务

从 Kafka 读取 topic_db 主题数据,筛选订单明细表和订单表数据,读取 dwd_traffic_page_log 主题数据,筛选订单页日志,关联三张表获得交易域下单事务事实表,写入 Kafka 对应主题。

P059

DwdTradeOrderDetail,TODO1 ~ TODO7

P060

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_order_detail
package com.atguigu.edu.realtime.app.dwd.db;import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author yhm* @create 2023-04-24 15:18*/
public class DwdTradeOrderDetail {public static void main(String[] args) {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//TODO 2 设置表格TTLEnvUtil.setTableEnvStateTtl(tableEnv, "10s");//TODO 3 从kafka读取业务数据topic_dbString groupId = "dwd_trade_order_detail";KafkaUtil.createTopicDb(tableEnv, groupId);//TODO 4 从kafka读取日志数据dwd_traffic_page_logtableEnv.executeSql("create table page_log(\n" +"    common map<String,String>,\n" +"    page map<String,String>,\n" +"    ts string\n" +")" + KafkaUtil.getKafkaDDL("dwd_traffic_page_log", groupId));//TODO 5 过滤订单详情表Table orderDetail = tableEnv.sqlQuery("select \n" +"    data['id'] id,\n" +"    data['course_id'] course_id,\n" +"    data['course_name'] course_name,\n" +"    data['order_id'] order_id,\n" +"    data['user_id'] user_id,\n" +"    data['origin_amount'] origin_amount,\n" +"    data['coupon_reduce'] coupon_reduce,\n" +"    data['final_amount'] final_amount,\n" +"    data['create_time'] create_time,\n" +"    date_format(data['create_time'], 'yyyy-MM-dd') create_date,\n" +"    ts\n" +"from topic_db\n" +"where `table`='order_detail'\n" +"and type='insert'");tableEnv.createTemporaryView("order_detail", orderDetail);//TODO 6 过滤订单表Table orderInfo = tableEnv.sqlQuery("select \n" +"    data['id'] id, \n" +"    data['out_trade_no'] out_trade_no, \n" +"    data['trade_body'] trade_body, \n" +"    data['session_id'] session_id, \n" +"    data['province_id'] province_id\n" +"from topic_db\n" +"where `table`='order_info'\n" +"and type='insert'");tableEnv.createTemporaryView("order_info", orderInfo);//TODO 7 获取下单日志Table orderLog = tableEnv.sqlQuery("select \n" +"    common['sid'] session_id,\n" +"    common['sc'] source_id\n" +"from page_log\n" +"where page['page_id']='order'");tableEnv.createTemporaryView("order_log", orderLog);//TODO 8 关联3张表格Table resultTable = tableEnv.sqlQuery("select \n" +"    od.id,\n" +"    od.course_id,\n" +"    od.course_name,\n" +"    od.order_id,\n" +"    od.user_id,\n" +"    od.origin_amount,\n" +"    od.coupon_reduce,\n" +"    od.final_amount,\n" +"    od.create_time,\n" +"    oi.out_trade_no,\n" +"    oi.trade_body,\n" +"    oi.session_id,\n" +"    oi.province_id,\n" +"    ol.source_id,\n" +"    ts,\n" +"    current_row_timestamp() row_op_ts \n" +"from order_detail od\n" +"join order_info oi\n" +"on od.order_id=oi.id\n" +"left join order_log ol\n" +"on oi.session_id=ol.session_id");tableEnv.createTemporaryView("result_table", resultTable);//TODO 9 创建upsert kafkatableEnv.executeSql("create table dwd_trade_order_detail( \n" +"    id string,\n" +"    course_id string,\n" +"    course_name string,\n" +"    order_id string,\n" +"    user_id string,\n" +"    origin_amount string,\n" +"    coupon_reduce string,\n" +"    final_amount string,\n" +"    create_time string,\n" +"    out_trade_no string,\n" +"    trade_body string,\n" +"    session_id string,\n" +"    province_id string,\n" +"    source_id string,\n" +"    ts string,\n" +"    row_op_ts TIMESTAMP_LTZ(3) ,\n" +"    primary key(id) not enforced \n" +")" + KafkaUtil.getUpsertKafkaDDL("dwd_trade_order_detail"));//TODO 10 写出数据到kafkatableEnv.executeSql("insert into dwd_trade_order_detail " +"select * from result_table");}
}

P061

9.8 交易域支付成功事务事实表
9.8.1 主要任务

从 Kafka topic_db主题筛选支付成功数据、从dwd_trade_order_detail主题中读取订单事实数据,关联两张表形成支付成功宽表,写入 Kafka 支付成功主题。

P062

[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_pay_suc_detail
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar

P063

9.9 事实表动态分流

9.9.1 主要任务

DWD层余下的事实表都是从topic_db中取业务数据库一张表的变更数据,按照某些条件过滤后写入Kafka的对应主题,它们处理逻辑相似且较为简单,可以结合配置表动态分流在同一个程序中处理。

读取优惠券领用数据,写入 Kafka 优惠券领用主题。

P064

BaseDBApp

//TODO 1 创建环境设置状态后端

//TODO 2 读取业务topic_db主流数据

//TODO 3 清洗转换topic_db数据

//TODO 4 使用flinkCDC读取dwd配置表数据

//TODO 5 创建广播流

//TODO 6 连接两个流

//TODO 7 过滤出需要的dwd表格数据

P065

package com.atguigu.edu.realtime.app.dwd.db;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.app.func.DwdBroadcastProcessFunction;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.bean.DwdTableProcess;
import com.atguigu.edu.realtime.util.EnvUtil;
import com.atguigu.edu.realtime.util.KafkaUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;/*** @author yhm* @create 2023-04-24 18:05*/
public class BaseDBApp {public static void main(String[] args) throws Exception {//TODO 1 创建环境设置状态后端StreamExecutionEnvironment env = EnvUtil.getExecutionEnvironment(1);//TODO 2 读取业务topic_db主流数据String groupId = "base_DB_app";DataStreamSource<String> dbStream = env.fromSource(KafkaUtil.getKafkaConsumer("topic_db", groupId), WatermarkStrategy.noWatermarks(), "base_db");//TODO 3 清洗转换topic_db数据SingleOutputStreamOperator<JSONObject> jsonObjStream = dbStream.flatMap(new FlatMapFunction<String, JSONObject>() {@Overridepublic void flatMap(String value, Collector<JSONObject> out) throws Exception {try {JSONObject jsonObject = JSON.parseObject(value);String type = jsonObject.getString("type");if (!("bootstrap-start".equals(type) || "bootstrap-insert".equals(type) || "bootstrap-complete".equals(type))) {out.collect(jsonObject);}} catch (Exception e) {e.printStackTrace();}}});jsonObjStream.print();//TODO 4 使用flinkCDC读取dwd配置表数据MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("node001").port(3306).username("root").password("123456").databaseList("edu_config").tableList("edu_config.dwd_table_process")// 定义读取数据的格式.deserializer(new JsonDebeziumDeserializationSchema())// 设置读取数据的模式.startupOptions(StartupOptions.initial()).build();//TODO 5 创建广播流DataStreamSource<String> tableProcessStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "dwd_table_process");MapStateDescriptor<String, DwdTableProcess> dwdTableProcessState = new MapStateDescriptor<>("dwd_table_process_state", String.class, DwdTableProcess.class);BroadcastStream<String> broadcastDS = tableProcessStream.broadcast(dwdTableProcessState);//TODO 6 连接两个流BroadcastConnectedStream<JSONObject, String> connectStream = jsonObjStream.connect(broadcastDS);//TODO 7 过滤出需要的dwd表格数据SingleOutputStreamOperator<JSONObject> processStream = connectStream.process(new DwdBroadcastProcessFunction(dwdTableProcessState));//TODO 8 将数据写出到kafkaprocessStream.sinkTo(KafkaUtil.getKafkaProducerBySchema(new KafkaRecordSerializationSchema<JSONObject>() {@Overridepublic ProducerRecord<byte[], byte[]> serialize(JSONObject element, KafkaSinkContext context, Long timestamp) {String topic = element.getString("sink_table");element.remove("sink_table");return new ProducerRecord<byte[], byte[]>(topic, element.toJSONString().getBytes());}}, "base_db_app_trans"));//TODO 9 执行任务env.execute();}
}
[atguigu@node001 ~]$ kafka-console-consumer.sh --bootstrap-server node001:9092 --topic dwd_trade_cart_add
[atguigu@node001 ~]$ cd /opt/module/data_mocker/01-onlineEducation/
[atguigu@node001 01-onlineEducation]$ java -jar edu2021-mock-2022-06-18.jar 

启动maxwell。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/165155.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于 HTML 的一切:初学者指南

HTML 代表超文本标记语言&#xff0c;是用于创建网页和 Web 应用程序的标准语言。 本指南将全面介绍 HTML&#xff0c;涵盖从基本语法和语义到更高级功能的所有内容。 我的目标是用简单的术语解释 HTML&#xff0c;以便即使没有编码经验的人也能学习如何使用 HTML 构建网页。…

java泛型的深入 泛型还可以在很多地方进行定义 泛型类 泛型方法 泛型接口 泛型的继承和通配符 泛型类练习

文章目录 泛型的深入泛型还可以在很多地方进行定义泛型类泛型方法泛型接口 泛型的继承和通配符泛型类练习总结 泛型的深入 public static void main(String[] args) {//在没有泛型的时候怎么存储数据ArrayList listnew ArrayList();list.add(1);list.add("abc");//遍…

BAM(Bottleneck Attention Module)

BAM&#xff08;Bottleneck Attention Module&#xff09;是一种用于计算机视觉领域的深度学习模型结构&#xff0c;它旨在提高神经网络对图像的特征提取和感受野处理能力。BAM模块引入了通道注意力机制&#xff0c;能够自适应地加强或减弱不同通道的特征响应&#xff0c;从而提…

P5906 【模板】回滚莫队不删除莫队

这一题&#xff0c;虽说在洛谷标的是模板题&#xff0c;但可能没有“历史研究”那一题更加模板。 这一题相对于回滚莫队的模板题&#xff0c;可能在回滚的处理上稍微复杂了一点。对于回滚莫队就不多解释了&#xff0c;可以看一下 回滚莫队模板题 这一篇博客&#xff0c;稍微简单…

【数据结构】手撕单链表

目录 前言 1 链表 1.1 链表的概念及结构 1.2 链表的分类 1.2.1 单向或者双向 1.2.2 带头或者不带头 1.2.3 循环或者非循环 1.2.4 无头单向非循环链表 1.2.5 带头双向循环链表 2 链表的实现 2.1 结构 2.2 结点的创建 2.3 尾插 2.4 头插 2.5 尾删 2.6 头删 2.7 …

「随笔」浅谈2023年云计算的发展趋势

在2023年&#xff0c;云计算的发展趋势将受到政治、经济、社会和科技四个维度的影响。以下是对这些维度的具体分析&#xff1a; 1.1 政治维度&#xff1a; 全球政策推动&#xff1a; 随着全球各国政策对云计算的重视程度不断提高&#xff0c;云计算服务将获得更广泛的市场准入…

Python爬虫-获取汽车之家车家号

前言 本文是该专栏的第9篇,后面会持续分享python爬虫案例干货,记得关注。 地址:aHR0cHM6Ly9jaGVqaWFoYW8uYXV0b2hvbWUuY29tLmNuL0F1dGhvcnMjcHZhcmVhaWQ9MjgwODEwNA== 需求:获取汽车之家车家号数据 笔者将在正文中介绍详细的思路以及采集方法,废话不多说,跟着笔者直接往…

Vite创建React项目,另外一种更加简单的方法

在上一篇blog中一个一个安装依赖dependencies&#xff0c;有没有一步到位的方法呢&#xff0c;有! 参考《React 18 Design Patterns and Best Practices Design, build, and deploy production-ready web applications with React》4th 第一章倒数第二节Vite as a solution有个…

如何实现单病种上报的多院区/集团化/平台联动管理

背 景 米软售前人员在了解客户单病种上报的相关需求中发现&#xff0c;部分医院分为本部、分部或总院、分院等多个院区&#xff0c;各院区需共用一套系统&#xff1b;部分医院与其他兄弟医院隶属于同一集团医院&#xff0c;全集团需统一部署&#xff1b;部分市/区卫健委要求全…

休闲玩具的软文营销策略

休闲玩具行业作为新兴市场&#xff0c;具有广阔的发展前景&#xff0c;生活水平的提高带来消费观念的升级&#xff0c;城市化进程加速导致人们对休闲娱乐的需求持续上涨&#xff0c;玩具作为娱乐性、放松性、互动性的产品受到广大群体喜爱&#xff0c;休闲玩具市场的竞争也愈发…

视频特效编辑软件 After Effects 2022 mac中文版介绍 (ae 2022)

After Effects 2022 mac是一款视频特效编辑软件&#xff0c;被称为AE&#xff0c;拥有强大的特效工具&#xff0c;旋转&#xff0c;用于2D和3D合成、动画制作和视觉特效等&#xff0c;效果创建电影级影片字幕、片头和过渡&#xff0c;是一款可以帮助您高效且精确地创建无数种引…

js处理赎金信

给你两个字符串&#xff1a;ransomNote 和 magazine &#xff0c;判断 ransomNote 能不能由 magazine 里面的字符构成。 如果可以&#xff0c;返回 true &#xff1b;否则返回 false 。 magazine 中的每个字符只能在 ransomNote 中使用一次。 示例 1&#xff1a; 输入&…