【大数据】Flink SQL 语法篇(二):WITH、SELECT WHERE、SELECT DISTINCT

Flink SQL 语法篇(二)

  • 1.WITH 子句
  • 2.SELECT & WHERE 子句
  • 3.SELECT DISTINCT 子句

1.WITH 子句

应用场景(支持 Batch / Streaming):With 语句和离线 Hive SQL With 语句一样的,语法糖 +1,使用它可以让你的代码逻辑更加清晰。

-- 语法糖 +1
WITH orders_with_total AS (SELECT order_id, price + tax AS totalFROM Orders
)
SELECT order_id, SUM(total)
FROM orders_with_total
GROUP BY order_id;

2.SELECT & WHERE 子句

应用场景(支持 Batch / Streaming):SELECT & WHERE 语句和离线 Hive SQL 语句一样的,常用作 ETL,过滤,字段清洗标准化。

INSERT INTO target_table
SELECT * FROM OrdersINSERT INTO target_table
SELECT order_id, price + tax FROM OrdersINSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1))  AS t (order_id, price)INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3

其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。

以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了。

INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3

这个 SQL 对应的实时任务,假设 Orders 为 Kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

可以看到这个实时任务的所有算子是以一种 Pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。

在这里插入图片描述

⭐ 关于看如何看一段 Flink SQL 最终的执行计划:最好的方法就如上图,看 Flink Web UI 的算子图,算子图上详细的标记清楚了每一个算子做的事情。

以上图来说,我们可以看到主要有三个算子:

  • Source 算子。Source: TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, name]) -> Calc(select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]) -> WatermarkAssigner(rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)])
    • 其中 Source 表名称为 table=[[default_catalog, default_database, Orders]
    • 字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
    • Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
  • 过滤算子。Calc(select=[order_id, name, row_time], where=[(order_id > 3)]) -> NotNullEnforcer(fields=[order_id])
    • 其中过滤条件为 where=[(order_id > 3)]
    • 结果字段为 select=[order_id, name, row_time]
  • Sink 算子。Sink: Sink(table=[default_catalog.default_database.target_table], fields=[order_id, name, row_time])
    • 其中最终产出的表名称为 table=[default_catalog.default_database.target_table]
    • 表字段为 fields=[order_id, name, row_time]

可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 Debug、调优前最基础的一个技巧。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子,然后 数据源算子 就运行结束了,释放资源了。
  • 过滤和字段标准化算子Where id > 3PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3 ?,将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子,然后 过滤和字段标准化算子 就运行结束了,释放资源了
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了

可以看到离线任务的算子是分阶段(Stage)进行运行的,每一个 Stage 运行结束之后,然后下一个 Stage 开始运行,全部的 Stage 运行完成之后,这个离线任务就跑结束了。

注意:很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释:

  • 分区概念:离线由于能力限制问题,通常都是进行一批一批的数据计算,每一批数据的数据量都是有限的集合,这一批一批的数据自然的划分方式就是时间,比如按小时、天进行划分分区。但是 在实时任务中,是没有分区的概念的,实时任务的上游、下游都是无限的数据流。
  • 计算任务定时调度概念:同上,离线就是由于计算能力限制,数据要一批一批算,一批一批输入、产出,所以要按照小时、天定时的调度和计算。但是 在实时任务中,是没有定时调度的概念的,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。

3.SELECT DISTINCT 子句

应用场景(支持 Batch / Streaming):语句和离线 Hive SQL SELECT DISTINCT 语句一样的,用作根据 key 进行数据去重。

INSERT into target_table
SELECT DISTINCT id 
FROM Orders

也是拿离线和实时做对比。

这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:

  • 数据源算子From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Orders Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
  • 去重算子DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
  • 数据汇算子INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中。

在这里插入图片描述
对于实时任务,计算时的状态可能会无限增长。状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。

那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:

  • 数据源算子From Order):数据源从 Orders Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子,然后 数据源算子 就运行结束了,释放资源了。
  • 去重算子DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子,然后 去重算子 就运行结束了,释放资源了。
  • 数据汇算子INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/439221.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Nuget包缓存存放位置迁移

一、背景 默认情况下,NuGet会将项目中使用的包缓存到C盘,随着项目开发积累nuget包越来越多,这会逐渐挤占大量C盘空间,所以我们可以将nuget包缓存位置指定到其他盘中存放。 二、软件环境 win10、vs2022 三、查看当前缓存存放位…

【大数据】Flink 架构(三):事件时间处理

《Flink 架构》系列(已完结),共包含以下 6 篇文章: Flink 架构(一):系统架构Flink 架构(二):数据传输Flink 架构(三):事件…

[GN] 设计模式—— 创建型模式

文章目录 创建型模式单例模式 -- 确保对象唯一性饿汉式懒汉式优缺点使用场景 简单工厂模式例子:优化优缺点适用场景 工厂方法模式--多态工厂的实现例子优缺点适用场景 创建型模式 单例模式 – 确保对象唯一性 用TaskManager类。通过以下三步进行重构 为了确保Ta…

线程锁多线程的复习

线程 实现方式3种乐观锁&悲观锁线程池线程池总结 进程:是正在运行的程序 线程:是进程中的单个顺序控制流,是一条执行路径 实现方式3种 1.Thread //步骤一:定义一个继承Thread的类 //步骤二:再定义的类中重写run()方法 //步骤三:创建定义类对象 //步骤四:启动线程 class M…

【Spring实战】32 Spring Boot3 集成 Nacos 服务注册中心 并在 Gateway 网关中应用

文章目录 1. 定义2. 背景3. 功能和特性4. 下载安装5. 服务启动6. 使用示例1)服务提供者2)服务消费者3)测试 7. 代码参考结语 1. 定义 Nacos 是 Dynamic Naming and Configuration Service 的首字母简称,一个更易于构建云原生应用…

最后50个CC龙年红包封面,免费速领!还有更多......高中生也卷起Steam来了

微信视频号之前是送了我3张新年红包封面,一共是150个,但不太会操作浪费了100个,只能我自己用来送老铁了。 晓衡又做了一条 Cocos 小可爱 CC 封面红包,特别适合送女生或给小朋友们,点击视频领取!还好微信又送…

防火墙知识普及详解,使用TOR Router把TOR作为默认网关,增加隐私/匿名性

防火墙知识普及详解,使用TOR Router把TOR作为默认网关,增加隐私/匿名性。 #################### 免责声明:工具本身并无好坏,希望大家以遵守《网络安全法》相关法律为前提来使用该工具,支持研究学习,切勿用于非法犯罪活动,对于恶意使用该工具造成的损失,和本人及开发者…

嵌入式学习 Day13

一. 指针总结 1.指针概念 a.指针 --- 地址 ---内存单元编号 b.指针 --- 数据类型 ---指针类型 不同语境: 定义一个指针 //指针类型的变量 打印某个变量的指针 //指针 --地址 2.指针变量的定义 基类型 * 变量名 a.基类型 …

Pytest中doctests的测试方法应用

在 Python 的测试生态中,Pytest 提供了多种灵活且强大的测试工具。其中,doctests 是一种独特而直观的测试方法,通过直接从文档注释中提取和执行测试用例,确保代码示例的正确性。本文将深入介绍 Pytest 中 doctests 的测试方法,包括基本用法和实际案例,以帮助你更好地利用…

python3.8 安装缺少ssl、_ctypes模块解决办法

问题 安装pyhton3.8安装默认不依赖ssl 运行Flask项目时报错&#xff1a; Traceback (most recent call last):File "/usr/local/python3/bin/flask", line 8, in <module>sys.exit(main())File "/usr/local/python3/lib/python3.8/site-packages/flask…

TensorFlow2实战-系列教程9:RNN文本分类1

&#x1f9e1;&#x1f49b;&#x1f49a;TensorFlow2实战-系列教程 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在Jupyter Notebook中进行 本篇文章配套的代码资源已经上传 1、文本分类任务 1.1 文本分类 数据集构建&#xff1a;影评数据集进行情感分析&…

[C++历练之路]C++中的继承小学问

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; &#x1f354;前言&#xff1a; C中&#xff0c;继承是一种面向对象编程的重要概念&#xff0c;它允许一个类&#xff08;子类/派生类&#xff09;从另一个类&#xff08;父类/基类&#xff09;继承属性和方法。继承是…