【spark】SparkSQL

目录

  • SparkSQL
    • 01.快速入门
      • 什么是SparkSQL
      • 为什么学习SparkSQL
      • SparkSQL的特点
      • SparkSQL发展历史-前身Shark框架
      • SparkSQL发展历史
    • 02.SparkSQL概述
      • SparkSQL和Hive的异同
      • SparkSQL的数据抽象
      • DataFrame概述
      • SparkSession对象
    • 03.DataFrame入门和操作
      • DataFrame的组成
        • DataFrame的代码构建-基于RDD-1
        • DataFrame的代码构建-基于RDD-2
        • DataFrame的代码构建-基于RDD-3
        • DataFrame的代码构建-基于Pandas的DataFrame
        • DataFrame的代码构建-读取外部数据-text
        • DataFrame的代码构建-读取外部数据-json
        • DataFrame的代码构建-读取外部数据-csv
        • DataFrame的代码构建-读取外部数据-parquet
      • DataFrame的入门操作
      • SparkSQL数据清洗API
      • DataFrame数据写出
    • 04.SparkSQL函数定义
      • SparkSQL定义UDF
      • SparkSQL使用窗口函数
    • 05.SparkSQL的运行流程
      • SparkSQL的自动优化
      • Catalyst优化器
      • SparkSQL的执行流程
    • 06.SparkSQL整合Hive
      • Hive执行流程
      • SparkOn Hive
    • 07.分布式SQL引擎配置

SparkSQL

01.快速入门

什么是SparkSQL

SparkSQL是Spark的一个模块,用于处理海量结构化数据

为什么学习SparkSQL

SparkSQL是非常成熟的海量结构化数据处理框架:
学习SparkSQL主要在2个点:

  • SparkSQL本身十分优秀,支持SQL语言、性能强、可以自动优化、API简单、兼容HIVE等等
  • 企业大面积在使用SparkSQL处理业务数据
    1、离线开发
    2、数仓搭建
    3、科学计算
    4、数据分析

SparkSQL的特点

  • 融合性:SQL可以无缝集成在代码中,随时用SQL处理数据
  • 统一数据访问:一套标准API可读写不同数据源
  • Hive兼容:可以使用SparkSQL直接计算并生成Hive数据表
  • 标准化连接:支持标准化JDBC/ODBC连接,方便和各种数据源进行数据交互

SparkSQL发展历史-前身Shark框架

在这里插入图片描述

SparkSQL发展历史

在这里插入图片描述

02.SparkSQL概述

SparkSQL和Hive的异同

相同点:
1、分布式SQL计算引擎
2、构建大规模结构化数据计算的绝佳利器,同时SparkSQL拥有更好的性能

不同点:
在这里插入图片描述

SparkSQL的数据抽象

1、SparkSQL-DataFrame

- 二维表数据结构
- 分布式结构集合(分区)

2、SparkSQL FOor JVM-DataSet【可用于Java\Scala\语言】
3、SparkSQL For Python\R-DataFrame【可用于Java\Scale\Python\R】

DataFrame概述

在这里插入图片描述

在这里插入图片描述
DataFrame是按照二维表格的形式存储数据
RDD则是存储对象本身

SparkSession对象

在RDD阶段,程序的执行入口对象是SparkContext
在Sparke2.0后,推出SparkSeaaion对象,作为Spark编码的统一入口对象

SparkSession对象可以:
1、用于SparkSQL编程作为入口对象
2、用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
在这里插入图片描述

03.DataFrame入门和操作

DataFrame的组成

DataFrame是一个二维表结构,那么表格结构就有无法绕开的三个点:

  • 表结构表述
    比如MySQL中的一张表:

  • 由许多行组成

  • 数据也可以被分成多个列

  • 表也有表结构信息(列、列名、列类型、列约束等)
    在结构层面上:

  • StructType对象描述整个DataFrame的表结构

  • StructField对象描述一个列的信息
    在数据层面上:

  • Row对象记录一行数据

  • Column对象记录一列数据并包含列的信息

在这里插入图片描述
在表结构层面,DataFrame的表结构由:
StructType描述:

struct_type = StructType().\add("id",IntegerType(),False).\add("name",StringType(),True).\add("age",IntegerType(),False)

一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructedType对象
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

一行数据描述为Row对象,如Row(1,张三,11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息

DataFrame的代码构建-基于RDD-1
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))#2.构建DataFrame对象## 参数一:被转换的rdd## 参数二:指定列名,通过list的形式指定,按照顺序依次提供字符串名称即可df = spark.createDataFrame(rdd,schema=['name','age'])df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-基于RDD-2
#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)df = spark.createDataFrame(rdd,schema)df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-基于RDD-3

该方法用于对数据类型不敏感

#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContextrdd = sc.testFile("../data/input/sql/people.txt).map(lambda x:x.spalit(",")).map(lambda x:(x[0],int(x[1])))# toDF的方式构建DataFramedf1 = rdd.toDF(['name','age'])# 方法二schema=StructType().add('name',StringType(),True).add('age'IntegerType(),False)rdd.toDF(schema)df1.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf1.show(20,False)
DataFrame的代码构建-基于Pandas的DataFrame
#coding:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StringType,IntegerTypeif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# 基于Pandas的DataFrame构建SparkSQL的DataFrame对象pdf = pd.DataFrame({'id':[1,2,3],'name':['张大仙','王小小','王大锤'],'age':[11,11,11]})# 将Pandas的DF对象转换成SparkDFdf1 = spark.createDataFrame(pdf)df1.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf1.show(20,False)
DataFrame的代码构建-读取外部数据-text

构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是String

spark session.read.format(“text|csv|json|parquet|orc|avro|jdbc…”)
.option(“k”,“v”)#option可选
.schema(StructType|String)#STRING的语法如。Schema(“name STRING”,“age INT” )
.load(“被读取文件的路径,支持本地文件系统和HDFS”)

#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# 构建StructType,text数据源,读取数据的特点是,是将一整行只作为一个列读取,默认列名是value 类型是Stringschema = StructType().add('data',StirngType(),True)df = spark.read.format('text').schema(schema=schema).load('../data/input/sql/people.txt')df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-读取外部数据-json

json类型自带有Schema信息

#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# json类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('json').load('../data/input/sql/people.txt')df.printSchema()## 参数一:表示 展示出来多少条数据,默认不传的话是20## 参数二:表示是否对列进行截断,如果列的数据长度超过20个字符串长度,后续的内容不显示以。。。代替## 如果给False,表示不截断全部显示,默认是Truedf.show(20,False)
DataFrame的代码构建-读取外部数据-csv
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# json类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('csv').\option('sep',';').\option('header',True).\option('encoding','utf-8').\schema('name STRING age INT,job STRING').\load('../data/input/sql/people.txt')df.printSchema()df.show(20,False)
DataFrame的代码构建-读取外部数据-parquet

parquet:是spark中常用的一种列式存储文件格式,和Hive中ORC差不多,他俩都是列存储格式

parquet对比普通文本文件的区别

  • parquet内置schema(列名、列类型、是否为空)
  • 存储是以列作为存储格式
  • 存储时序列化存储在文件中的,有压缩属性体积小
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# parquet类型自带有Schema信息schema = StructType().add('data',StirngType(),True)df = spark.read.format('parquet').load('../data/input/sql/people.txt')df.printSchema()df.show(20,False)

DataFrame的入门操作

DataFrame支持两种风格进行编程,分别是:

  • DSL风格:DataFrame的特有API,调用API的方式来处理Data
#coding:utf8from pyspark.sql import SparkSessionif __name__ == '__main__':# 0.构建执行环境入口对象SparkSessionspark = SparkSesion.builder.\appName("test").\master("local[*]").\getOrcreate()#1.基于RDD转换成DataFramesc = spark.sparkContext# parquet类型自带有Schema信息df = spark.read.format('csv').load('../data/input/sql/people.txt')# column对象的获取id_column = df['id']subject_column = df['subject']# DSL风格演示df.select(["id","subject"]).show()df.select ("id","subject").show()df.select(id_column,subject_column) # filter APIdf.filter("score < 99").show()df.filter(df['score'] < 99).show()# where APIdf.where("score < 99").show()df.where(df['score'] < 99).show()# group by APIdf.groupBy("subject").count().show()df.groupBy(df['subject']).count().show()
  • SQL风格:spark.sql(“select * from XXX”)
    使用sparj.sql()来执行SQL语句查询,结果返回一个DataFrame
df.createTempView("score") #注册一个临时视图
df.createOrReplaceTempView("socre") #注册一个临时表,如果存在,进行替换
df.createGlobalTempView("score") # 注册一个全局表

全局表:跨sparksession对象使用,在一个程序内的多个sparkSession中均可调用,查询前带上前缀
global_temp.

SparkSQL数据清洗API

  • 去重方法:dropDuplicates
  • 缺失值处理:
    • dropna 是可以对缺失值进行删除;只要列中有null 就删除这一行数据
      参数:thread=3表示,至少满足3个有效列,不满足就删除当前数据
    • fillna(“loss”) 对缺失值的列进行填充
    • fillna(“N/A”,subset=[‘job’])指定列进行填充
    • fillna({‘name’:‘未知姓名’,‘age’:1,‘job’:‘worker’})设定一个字典,对所有的列提供填充规则

DataFrame数据写出

SparkSQL 统一API写出DataFrame数据
df.write.mode().format().option(K,V).save(PATH)

  • mode,传入模式字符串可选:append追加,overwrite覆盖,ignore忽略,error重复就报异常(默认的)
  • format,传入格式字符串,可选:text,csv,json,parquet,orc,avro,jdbc
  • save 写出的路径,支持本地文件和HDFS

04.SparkSQL函数定义

SparkSQL定义UDF

pyspark UDF

SparkSQL使用窗口函数

  • 聚合开窗函数

  • 排序开窗函数
    – ROW_NUMBER() OVER()
    –DENSE_RANK() OVER()
    –RANK() OVER()

  • NTILE分组窗口

05.SparkSQL的运行流程

SparkSQL的自动优化

RDD的运行完全是按照开发者的代码执行,如果开发者水平有限,RDD的执行效率也会收到影响
而SparkSQL会对写完的代码,执行“自动优化”,以提升代码运行效率,避免开发者水平影响到代码执行效率;依赖于:Catalyst优化器

Catalyst优化器

在这里插入图片描述
STEP1:解析SQL,并生成AST(抽象语法树)
在这里插入图片描述在这里插入图片描述
在这里插入图片描述

大方面的优化点有2个:

  • 谓词下推、断言下推:将逻辑判断提前到前面,以减少shuffle阶段的数据量
  • 列值剪裁:将加载的列进行剪裁,尽量减少被处理数据的宽度

SparkSQL的执行流程

06.SparkSQL整合Hive

Hive执行流程

在这里插入图片描述

SparkOn Hive

在这里插入图片描述

Spark On Hive就是因为Spark自身没有元数据管理功能,所以使用Hive的Metastore服务做为元数据管理服务。计算有Spark执行

07.分布式SQL引擎配置

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/415012.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

探索FTP:原理、实践与安全优化

引言 在正式开始讲解之前&#xff0c;首先来了解一下文件存储的类型有哪些。 DAS、SAN和NAS是三种不同的存储架构&#xff0c;分别用于解决不同场景下的数据存储需求。 DAS (Direct Attached Storage 直接附加存储)&#xff1a;DAS 是指将存储设备&#xff08;如硬盘&#x…

Python Django编写接口并用Jmeter测试的方法

一、环境准备 python3.6.7Pycharm 二、创建项目 我这里是在Django项目中新建了个APP&#xff0c;目录结构如下图所示&#xff1a; 那么怎么在已有的Django项目中新建APP并进行配置呢&#xff1a; 2.1、新建app a、可以在终端输入命令&#xff1a;python manage.py startap…

解密OceanBase数据库引擎:探秘数据的深海奥秘

目录 1、引言 1.1 数据库引擎的重要性 1.2 OceanBase数据库引擎的背景和意义 2、OceanBase数据库引擎的基本概述 2.1 数据库引擎的定义和功能 2.2 OceanBase数据库引擎的特点和优势 3、OceanBase数据库引擎的架构和设计 3.1 分布式架构的概念和原理 3.2 OceanBase数据…

某马头条——day05

文章定时发布 实现方案对比 实现方案 延迟队列服务实现 按照文档进行项目的导入并准备数据库表导入对应实体类和nacos配置中心 乐观锁集成 redis集成和测试 成功集成通过测试 添加任务 ①&#xff1a;拷贝mybatis-plus生成的文件&#xff0c;mapper ②&#xff1a;创建task类…

C语言编译和链接

翻译环境和运行环境 在ANSI C的任何一种实现中&#xff0c;存在两个不同的环境 .第一种是翻译环境&#xff0c;在这个环境中源代码被转换为可执行的机器指令 .第二种是执行环境&#xff0c;它用于实际执行代码 翻译环境 翻译环境是由编译和链接两个大过程组成&#xff0c;而…

SMT回流焊工艺之回流温度曲线

引言 在SMT生产流程中&#xff0c;如何控制回焊炉的温度是非常重要的一环&#xff0c;好的炉温曲线图意味着可以形成良好的焊点。 上一期分享&#xff08;SMT回流焊温度解析之锡膏焊接特性&#xff09;中&#xff0c;我们着重介绍了SMT回流工艺中的锡膏焊接部分。本期内容主要…

docker容器和常用命令

1.什么是容器 容器是隔离的环境中运行的一个 进程 , 如果进程结束 , 容器就会停止. 细致: 容器的隔离环境 , 拥有自己的 ip 地址 , 系统文件 , 主机名 , 进程管理 , 相当于一个 mini的系统 2.容器 vs 虚拟机 3.Docker极速上手指南 #1.安装相关依赖. sudo yum install -y …

十三、Three场景物体增加发光特效

物体发光效果非常炫酷,本期来讲three场景内物体自带发光效果怎么来实现。本次使用的是threejs138版本,在vue3+vite+ant的项目中使用。 下面来看看实现的效果。绿色罐体有了明显的发光效果。 实现步骤 增加composer.js import { UnrealBloomPass } from three/examples/jsm/po…

[C++] external “C“的作用和使用场景(案例)

C++中extern "C"的作用是什么? 在 C++ 中,extern "C" 的作用是告诉编译器按照 C 语言的规范来处理函数名和变量名。这是因为 C++ 编译器会对函数名和变量名进行名称修饰(name mangling),以区分不同的函数和变量。而在 C 语言中,函数名和变量名不会被名…

高性能CMOS模拟多路复用器(DG408DQ-T1-E3)

DG408DQ-T1-E3是一个8通道单端模拟多路复用器设计用于将八个输入中的一个连接到公共输出 如由3位二进制地址&#xff08;A0&#xff0c;A1&#xff0c;A2&#xff09;所确定的。 DG408DQ-T1-E3通道上的电流在两个通道中都传导得同样好方向。在关闭状态下&#xff0c;每个通道…

可视化k8s页面(Kubepi)

Kubepi是一个简单高效的k8s集群图形化管理工具&#xff0c;方便日常管理K8S集群&#xff0c;高效快速的查询日志定位问题的工具 随便在哪个节点部署&#xff0c;我这里在主节点部署 docker pull kubeoperator/kubepi-server docker run --privileged -itd --restartunless-st…

Ceph分布式存储(1)

目录 一.ceph分布式存储 Ceph架构&#xff08;自上往下&#xff09; OSD的存储引擎&#xff1a; Ceph的存储过程&#xff1a; 二. 基于 ceph-deploy 部署 Ceph 集群 20-40节点上添加3块硬盘&#xff0c;一个网卡&#xff1a; 10节点为admin&#xff0c;20-40为node&…