Spark SQL基础

一.Pandas简介

1、基本介绍

  • Pandas是Python的一个第三方包,也是商业和工程领域最流行的结构化数据工具集,用于数据清洗、处理以及分析

  • Pandas和Spark SQL中很多功能都类似,甚至使用方法都是相同的

  • Pandas适用场景

    • Pandas用于处理单机数据

    • 可以在数据ETL、查询分析、报表输出等环节使用

2.数据结构

Python中的Pandas的DataFrame数据结构:

DataFrame:表示一个二维表对象,就是表示整个表

字段,列,索引;Series表示一行或者一列 

 

二.Spark SQL函数定义

1.窗口函数

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx]) 

分析函数大致可以分为以下三类:

1- 聚合函数:sum() count() avg() max() min()

2- row_number() rank() dense_rank() ntile()

3- first_value() last_value() lead() lag()

2.SQL函数分类

SQL函数,主要分为以下三大类:

①UDF函数:用户自定义函数

特点:一对一,输入一个得到一个

例如:split() substr()

②UDAF函数:用户自定义聚合函数

特点:多对一,输入多个得到一个

例如:sum()

③UDTF函数:用户自定义表数据生成函数

特点:一对多,输入一个得到多个

例如:explode()

在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数

思考:有这么多的内置函数,为啥还需要自定义函数呢?

为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数

1- SparkSQL原生的时候,Python只能开发UDF函数

2- SparkSQL借助其他第三方组件,Python可以开发UDF,UDAF函数

 

在Spark SQL中,针对Python语言,对于自定义函数,原生支持的并不是特别好。目前原生仅支持自定义UDF函数,而无法自定义UDAF函数和UDTF函数。

在1.6版本后,Java 和scala语言支持自定义UDAF函数,但Python并不支持。

Spark SQL原生存在的问题:大量的序列化和反序列

 虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
    
早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
    
目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作

3.Spark原生自定义UDF函数

自定义函数流程:

 第一步:在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步:将Python函数注册到Spark SQL中

        注册方式一:udf对象 = sparkSession.udf.register(参数1,参数2,参数3)

        参数1:[UDF函数名称],此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范

        参数2:[自定义的Python函数],表示将哪个Python的函数注册为Spark SQL的函数

        参数3:[UDF函数的返回值类型],用于表示当前这个Python的函数返回的类型

        udf对象:返回值对象,是一个UDF对象,可以在DSL中使用

        说明:如果通过方式一来注册函数,[可以用在SQL和DSL]

        注册方式二:udf对象 =F.udf(参数1,参数2)

        参数1:Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数

        参数2:返回值的类型,用于表示当前这个Python的函数返回的类型

        udf对象:返回值对象,是一个UDF对象,可以在DSL中使用.

        说明: 如果通过方式二来注册函数,【仅能用在DSL中】

        注册方式三:  语法糖写法  @F.udf(returnType=返回值类型)  放置到对应Python的函数上面
        说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
    
        
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

4.Pandas的UDF函数

4.1 Apache Arrow框架基本介绍

Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率

Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数

Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用

4.2 基于Arrow完成Pandas DataFrame和Spark DataFrame互转

使用场景:

1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析

2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame

 总结:
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()

 

4.3 基于Pandas完成UDF函数

基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。

Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型

基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数

 

第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可

第二步: 将Python函数包装成Spark SQL的函数
    注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
        参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
        参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
        使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
        注意: 如果编写的是UDAF函数,那么注册方式一需要配合注册方式三,一起使用
        
    注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
        参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
        参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
        udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
    
    注册方式三: 语法糖写法  @F.pandas_udf(returnType=返回值Spark SQL的数据类型)  放置到对应Python的函数上面
        说明: 实际是方式一的扩展。仅能用在DSL中使用
    
    
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可

三.Spark on Hive

1.集成原理

 

HiveServer2的主要作用: 接收SQL语句,进行语法检查;解析SQL语句;优化;将SQL转变成MapReduce程序,提交到Yarn集群上运行

SparkSQL与Hive集成,实际上是替换掉HiveServer2。是SparkSQL中的HiveServer2替换掉了Hive中的HiveServer2。

集成以后优点如下:
1- 对于SparkSQL来说,可以避免在代码中编写schema信息。直接向MetaStore请求元数据信息
2- 对于SparkSQL来说,多个人可以共用同一套元数据信息,避免每个人对数据理解不同造成代码功能兼容性问题
3- 对于Hive来说,底层执行引擎由之前的MapReduce变成了Spark Core,能够提升运行效率
4- 对于使用者/程序员来说,SparkSQL与Hive集成,对于上层使用者来说,是完全透明的。

2.在代码中集成Hive

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':# 1- 创建SparkSession对象"""spark.sql.warehouse.dir:告诉Spark数据存储在什么地方。默认使用本地磁盘进行存储。推荐使用HDFShive.metastore.uris:告诉Spark元数据信息去什么地方找MetaStoreenableHiveSupport():开启SparkSQL和Hive的集成"""spark = SparkSession.builder\.config("spark.sql.warehouse.dir","hdfs://node1:8020/user/hive/warehouse")\.config("hive.metastore.uris","thrift://node1.itcast.cn:9083")\.appName('sparksql_hive')\.master('local[*]')\.enableHiveSupport()\.getOrCreate()# 2- 数据输入# 3- 数据处理# 4- 数据输出spark.sql("show databases").show()spark.sql("""select *from day07.studentwhere id>=2""").show()# 5- 释放资源spark.stop()

 总结:
spark.sql.warehouse.dir: 告知Spark,数据表存放的地方。推荐使用HDFS进行存储。如果不配置,默认使用本地磁盘存储。
hive.metastore.uris: 告知Spark,MetaStore元数据管理服务的连接信息
enableHiveSupport() : 开启Spark和Hive的集成

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/338269.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

blob分析找出感兴趣区域,求其轮廓再用差分找缺陷

*关闭程序计数器,图形变量,窗口图形更新 dev_update_off() * Image Acquisition 01: Code generated by Image Acquisition 01 ImageFiles : [] ImageFiles[0] : 1.bmp ImageFiles[1] : 2.bmp for Index : 0 to |ImageFiles| - 1 by 1*读取一张图像read_image (Image, ImageFil…

mac录屏软件有哪些?分享5个实用软件

mac录屏软件有哪些?随着Mac电脑在创意和专业领域的使用日益普及,越来越多的用户需要录制屏幕内容以进行分享、演示或后期编辑。本文将为您介绍几款知名的Mac录屏软件,让您轻松实现屏幕录制的需求。 1.迅捷屏幕录像工具 这是一款功能强大的屏…

【Xilinx FPGA】异步 FIFO 的复位

FIFO(First-In-First-Out,先入先出)是一种的存储器类型,在 FPGA 开发中通常用于数据缓存、位宽转换或者跨时钟域(多 bit 数据流)。在使用异步 FIFO 时,应注意复位信号是否遵循相关要求和规范&am…

【设计模式-02】Strategy策略模式及应用场景

一、参考资料 Java 官方文档 Overview (Java SE 18 & JDK 18)module indexhttps://docs.oracle.com/en/java/javase/18/docs/api/index.html Java中使用到的策略模式 Comparator、comparable Comparator (Java SE 18 & JDK 18)declaration: module: java.base, pa…

水和冰一起进微波炉会怎样?不会还有人不知道吧

水和冰一起放到微波炉里加热 3 分钟,水已经冒烟的情况下,冰块会化掉吗? 事实上并不会。 不信,看一下这个视频—— 这个博主把一杯水和一杯冰放到微波炉里同时加热3分钟,结果水已经开始汽化了,冰还是冰。 这…

React入门 - 06(TodoList 列表数据的新增和删除)

本章内容 目录 一、实践一下 React 的列表渲染二、TodoList 新增功能三、列表循环的 key四、删除 上一节内容我们完成了输入框中可以自由输入内容,这一节我们继续 TodoList功能的完善:列表数据的新增和删除。 在开始之前,我们先介绍一下 Re…

LeetCode 145. 二叉树的后序遍历

145. 二叉树的后序遍历 给你一棵二叉树的根节点 root ,返回其节点值的 后序遍历 。 示例 1: 输入:root [1,null,2,3] 输出:[3,2,1]示例 2: 输入:root [] 输出:[]示例 3: 输入&…

YOLOv8-Seg改进:轻量化改进 | MobileNetV3,轻量级骨架首选

🚀🚀🚀本文改进:MobileNetV3的创新点包括:使用自适应瓶颈宽度、借鉴SENet中的Squeeze-and-Excitation机制、引入h-swish激活函数等。 🚀🚀🚀YOLOv8-seg创新专栏:http://t.csdnimg.cn/KLSdv 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1)手把手教…

Jmeter+ant+jenkins持续集成

一、环境准备 1、 jdk环境 要求JDK1.8以上,命令行输入:java -version,出现如下提示说明安装成功。 2、 Jmeter环境 下载Jmeter最新版本,解压即可,添加bin目录到环境变量。 3、 Ant环境 设置ant环境变量&#xff0…

报错解决方法——http404(Spring MVC)

一.检查静态资源是否加载成功 成功的标志就是在项目跑起来之后再target文件夹中的classes文件夹中可以找到自己写的配置文件。 1.查看resources文件夹是否被识别为资源文件夹 如图所示,文件夹图标右下角有三条杠代表被识别为资源文件 2.在pox.xml文件中插入如下…

生信技能33 - gnomAD数据库hg19/hg38 VCF文件批量下载脚本

gnomAD数据库下载地址 gnomAD downloads gnomAD v2.1.1数据集包含来自125,748个外显子组和15,708个全基因组的数据,所有这些数据都映射到GRCh 37/hg 19和GRCh 38/hg 38 两个版本的参考序列。 gnomAD数据库hg19与hg39 VCF文件批量下载脚本 download.sh # 获取当前目录路径…

本地部署 big-AGI

本地部署 big-AGI 1. big-AGI 介绍2. Github 地址3. 本地部署 big-AGI4. 访问 big-AGI5. 配置 API key6. 测试一下 1. big-AGI 介绍 欢迎使用 big-AGI 👋,这是一款面向需要功能、形式、简单性和速度的专业人士的 GPT 应用程序。 big-AGI 由 10 个供应商…