【Python】PySpark

前言

Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark对Python语言的支持,重点体现在Python第三方库:PySpark

PySpark是由Spark官方开发的Python语言第三方库。

Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。

在这里插入图片描述

基础准备

安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。

pip install pyspark或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)# 打印PySpark的运行版本
print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

运行需要Java环境,推荐jdk8

PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。

PySpark的编程,主要分为如下三大步骤:

在这里插入图片描述

数据输入

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

在这里插入图片描述

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象

# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1, 2, 3])    
rdd2 = sc.parallelize((1, 2, 3))    
rdd3 = sc.parallelize({1, 2, 3})    
rdd4 = sc.parallelize({'key1': 'value1', 'key2': 'value2'}) 
rdd5 = sc.parallelize('hello')  # 输出RDD的内容,需要使用collect()
print(rdd1.collect())   # [1, 2, 3]
print(rdd2.collect())   # [1, 2, 3]
print(rdd3.collect())   # [1, 2, 3]
print(rdd4.collect())   # ['key1', 'key2']
print(rdd5.collect())   # ['h', 'e', 'l', 'l', 'o']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

注意:

  • 字符串会被拆分出一个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。

先提前预备一个txt文件

hello
python
day
# 导包
from pyspark import SparkConf, SparkContext# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')# 输出RDD的内容,需要使用collect()
print(rdd.collect())    # ['hello', 'python', 'day']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据计算

RDD对象内置丰富的:成员方法(算子)

map算子

将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

rdd.map(func)
# func: f:(T) -> U
# f: 表示这是一个函数
# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数
# T是泛型的代称,在这里表示 任意类型
# U是泛型的代称,在这里表示 任意类型# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限
# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x * 10)# 输出RDD的内容,需要使用collect()
print(rdd2.collect())   # [10, 20, 30, 40, 50, 60]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用

rdd3 = rdd.map(lambda x: x * 10).map(lambda x: x + 9)

flatMap算子

对RDD执行map操作,然后进行解除嵌套操作。

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['a b c', 'd e f'])# 输出RDD的内容,需要使用collect()
print(rdd.map(lambda x: x.split(' ')).collect())    # [['a', 'b', 'c'], ['d', 'e', 'f']]
print(rdd.flatMap(lambda x:x.split(' ')).collect())   # ['a', 'b', 'c', 'd', 'e', 'f']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey算子

针对KV型(二元元组)RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

rdd.reduceByKey(func)
# func: (V, V) -> V
# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])# 输出RDD的内容,需要使用collect()
print(rdd.reduceByKey(lambda a, b: a+b).collect())  # [('b', 3), ('a', 2)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:lambda a,b: a+b

在这里插入图片描述

注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动by key来分组的

filter算子

过滤想要的数据进行保留。

rdd.filter(func)
# func: (T) -> bool
# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])# 输出RDD的内容,需要使用collect()
print(rdd.filter(lambda x: x % 2 == 0).collect())  # [2, 4, 6]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

distinct算子

对RDD数据进行去重,返回新的RDD

rdd.distinct() # 无需传参

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 3, 2, 6])# 输出RDD的内容,需要使用collect()
print(rdd.distinct().collect())  # [6, 1, 2, 3]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

sortBy算子

对RDD数据进行排序,基于你指定的排序依据。

rdd.sortKey(func, ascending=False, numPartitions=1)
# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序
# ascending:True升序,False降序
# numPartitions:用多少分区排序,全局排序需要设置为1

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('Aiw', 9), ('Tom', 6), ('Jack', 8), ('Bolb', 5)])# 输出RDD的内容,需要使用collect()
print(rdd.sortBy(lambda x: x[1], ascending=False,numPartitions=1).collect())  # [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

数据输出

collect算子

将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。

rdd.collect()
# 返回值是一个List

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3])rdd_list: list = rdd.collect()print(rdd_list)   # [1, 2, 3]
print(type(rdd_list))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

reduce算子

对RDD数据集按照你传入的逻辑进行聚合

rdd.reduce(func)
# func:(T, T) -> T
# 传入2个参数,1个返回值,要求返回值和参数类型一致

在这里插入图片描述

示例:

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))print(rdd.reduce(lambda a, b: a+b))   # 45# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

take算子

取RDD的前N个元素,组合成List进行返回。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_take: list = rdd.take(3)print(rdd_take)   # [1, 2, 3]
print(type(rdd_take))   # <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

count算子

计算RDD有多少条数据,返回值是一个数字。

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd_count: int = rdd.count()print(rdd_count)   # 9
print(type(rdd_count))   # <class 'int'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

saveAsTextFile算子

将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。

注意事项:

在这里插入图片描述

# 导包
from pyspark import SparkConf, SparkContext, sql
import os# 设置环境变量
os.environ['PYSPARK_PYTHON'] = 'D:/Python/python.exe'
os.environ['HADOOP_HOME'] = 'D:/Hadoop-3.0.0'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(range(1, 10))rdd.saveAsTextFile('./8.27/output') # 运行之前确保输出文件夹不存在,否则报错# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

上述代码输出结果,输出文件夹内有多个分区文件

修改RDD分区为1个

方式一:SparkConf对象设置属性全局并行度为1:

# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')
# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')
# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)

方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)

rdd = sc.parallelize(range(1, 10), numSlices=1)
rdd = sc.parallelize(range(1, 10), 1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/90736.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

什么是架构,架构的本质是什么

不论是开发人员还是架构师&#xff0c;我们都一直在跟软件系统打交道&#xff0c;架构是在工作中出现最频繁的术语之一。那么&#xff0c;到底什么是架构&#xff1f;你可能有自己的答案&#xff0c;也有可能没有答案。对“架构”的理解需要我们不断在实践中思考、归纳、演绎&a…

自编码器AE全方位探析:构建、训练、推理与多平台部署

本文深入探讨了自编码器&#xff08;AE&#xff09;的核心概念、类型、应用场景及实战演示。通过理论分析和实践结合&#xff0c;我们详细解释了自动编码器的工作原理和数学基础&#xff0c;并通过具体代码示例展示了从模型构建、训练到多平台推理部署的全过程。 关注TechLead&…

虫情监测仪的功能优势有哪些?

虫情监测仪是实时监测虫情的仪器&#xff0c;主要由诱虫装置、害虫灭活装置、落虫分散装置、收集装置、图像采集装置以及农业四情测报平台/智慧农业大数据平台组成&#xff0c;能够实时拍摄虫情照片&#xff0c;并将其上传至平台进行识别统计&#xff0c;以便对虫害的发生进行预…

如何从0跑起Vue3项目(Node和npm环境配置)

文章目录 vue项目运行Node安装打开vue项目 vue项目 拥有了一个vue3项目后怎么将它跑起来&#xff1f; 期末在学长敲代码那儿做的vue课设&#xff0c;怎么将他从0跑起来&#xff1f; char[] str "如果需要做课设的小伙伴&#xff0c;也可以百度:学长敲代码" strin…

轻松解决Mac和Windows中Unity汉化问题

本文是参考https://blog.csdn.net/ChinarCSDN/article/details/83213739该文写的。 上述作者是针对windows平台来写的&#xff0c;亲测方法可用。文中有中文字体包&#xff0c;可自行下载。 其他中文包下载地址&#xff1a; https://new-translate.unity3d.jp/v1/live/54/XX…

什么是量化交易接口?(股票下单接口)特点(一)

股市领域里的量化交易接口是一种用于与金融市场进行交互的编程接口&#xff0c;它允许开发者通过计算机程序自动执行交易策略。量化交易接口通常提供以下功能&#xff1a; 1. 实时市场数据获取&#xff1a;量化交易接口通常可以提供实时的市场行情数据&#xff0c;包括股票、期…

Vue:插槽,与自定义事件

1.插槽slot <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body> <div id"app"><!-- <p>列表书籍</p>--> <!-- …

详细介绍如何基于ESP32实现低功耗的电子纸天气显示器--附完整源码

实现界面展示 这是一款天气显示器,由支持 wifi 的 ESP32 微控制器和 7.5 英寸电子纸(又名电子墨水)显示器供电。当前和预测的天气数据是从 OpenWeatherMap API 获取的。传感器为显示屏提供准确的室内温度和湿度。 该项目在睡眠时消耗约 14μA,在约 10 秒的清醒期…

版本控制 Git工具的使用

版本控制的概念&#xff1a; 版本控制&#xff08;Revision control&#xff09;是一种在开发的过程中用于管理我们对文件、目录或工程等内容的修改历史&#xff0c;方便查看更改历史记录&#xff0c;备份以便恢复以前的版本的软件工程技术。简单来说就是用于管理多人协同开发…

拒绝摆烂!C语言练习打卡第七天

&#x1f525;博客主页&#xff1a;小王又困了 &#x1f4da;系列专栏&#xff1a;每日一练 &#x1f31f;人之为学&#xff0c;不日近则日退 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、选择题 &#x1f4dd;1.第一题 &#x1f4dd;2.第二题 &#x1f4d…

SpringBoot项目,执行install命名时,控制台显示:Unable to find main class

构建springboot多模块项目&#xff0c;启动时可以正常启动&#xff0c;执行了父工程的maven的clean也没问题&#xff0c;执行install的时候就报错了&#xff1a;Unable to find main class。显而易见 这个错是找不到主类。 记录下解决过程&#xff1a; 首先看自己项目的父工程…

鸿蒙系列-如何使用好 ArkUI 的 @Reusable?

如何使用好 ArkUI 的 Reusable&#xff1f; OpenHarmony 组件复用机制 在ArkUI中&#xff0c;UI显示的内容均为组件&#xff0c;由框架直接提供的称为 系统组件&#xff0c;由开发者定义的称为 自定义组件。 在进行 UI 界面开发时&#xff0c;通常不是简单的将系统组件进行组合…