分布式的计算框架之Spark(python第三方库视角学习PySpark)

基本介绍

Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 。现在形成一个高速发展应用广泛的生态系统。

特点介绍

Spark 主要有三个特点:

首先,高级 API 剥离了对集群本身的关注,Spark 应用开发者可以专注于应用所要做的计算本身。

其次,Spark 很快,支持交互式计算和复杂算法。

最后,Spark 是一个通用引擎,可用它来完成各种各样的运算,包括 SQL 查询、文本处理、机器学习等,而在 Spark 出现之前,我们一般需要学习各种各样的引擎来分别处理这些需求。(来源百度百科)

park对python语言的支持--->PySpark

Spark对Python语言的支持,重点体现在Python的第三方库: PySpark

PySpark是由Spark官方开发的Python语言第三方库Python开发者可以使用pip程序快速的安装PySpark并像其它三方库那样直接使用。

基础准备

PySpark库的安装

在命令行中输入:pip install pyspark

使用国内代理镜像网站:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

PySpark执行环境入口对象的构建

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。

PySpark的执行环境入口对象是:类SparkContext的类对象

如何通过代码获得类对象,代码如下:

  1. from pyspark import SparkConf, SparkContext

  2. # 创建SparkConf类对象

  3. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  4. # 基于SparkConf类对象创建SparkContext对象

  5. sc = SparkContext(conf=conf)

  6. # 打印pyspark的运行版本

  7. print(sc.version)

  8. # 停止SparkContext对象的运行(停止pyspark程序)

  9. sc.stop()

PySpark的编程模型

PySpark的编程,主要分为如下三大步骤: 

数据输入

RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象

RDD全称为: 弹性分布式数据集(Resilient Distributed Datasets)

PySpark针对数据的处理,都是以RDD对象作为载体即:

(1)数据存储在RDD内

(2)各类数据的计算方法,也都是RDD的成员方法

(3)RDD的数据计算方法,返回值依旧是RDD对象

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将list、tuple、set、dict、str转换为PySpark的RDD对象。

  1. from pyspark import SparkConf, SparkContext

  2. # 创建SparkConf类对象

  3. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  4. # 基于SparkConf类对象创建SparkContext对象

  5. sc = SparkContext(conf=conf)

  6. rdd1 = sc.parallelize([1, 2, 3, 4, 5]) # 列表

  7. rdd2 = sc.parallelize((1, 2, 3, 4, 5)) # 元组

  8. rdd3 = sc.parallelize("study python") # 字符串

  9. rdd4 = sc.parallelize({1, 2, 3, 4, 5}) # 集合

  10. rdd5 = sc.parallelize({"key1": "value1", "key2": "value2", "key3": "value3"}) # 字典

  11. # 如果要查看RDD里面有什么内容,需要用collect()方法

  12. print(rdd1.collect())

  13. print(rdd2.collect())

  14. print(rdd3.collect())

  15. print(rdd4.collect())

  16. print(rdd5.collect())

  17. # # 停止SparkContext对象的运行(停止pyspark程序)

  18. sc.stop()

注意:

(1)字符串会被拆分出1个个的字符,存入RDD对象字典

(2)仅有key会被存入RDD对象

读取文件转RDD对象

PySpark支持通过SparkContext入口对象,来读取文件并构建出RDD对象

前面逻辑都是一样的,只是调用方法不一样,需要使用sc.textFile

  1. from pyspark import SparkConf, SparkContext

  2. # 创建SparkConf类对象

  3. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  4. # 基于SparkConf类对象创建SparkContext对象

  5. sc = SparkContext(conf=conf)

  6. rdd = sc.textFile("G:\资料\2011年1月销售数据.txt")

  7. # 如果要查看RDD里面有什么内容,需要用collect()方法

  8. print(rdd.collect())

  9. # # 停止SparkContext对象的运行(停止pyspark程序)

  10. sc.stop()

数据计算

PySpark的数据计算,都是依赖RDD对象内置丰富的“成员方法(算子)”来进行的

因为spark是一个分布式程序,内部运行机制比较复杂,暂不讨论,我们只需要知道在python中运行spark程序时,需要额外增加以下代码,否则会报错:spark找不到python程序

  1. # 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境

  2. os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"

  3. os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"

(1)map方法

功能:将RDD的数据一条条处理,返回新的RDD

语法:rdd.map(fun) # 需要传入一个函数

需求:给[1,2,3,4,5]每个数字乘以10

代码如下:

  1. import sys

  2. from pyspark import SparkConf, SparkContext

  3. import os

  4. # 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境

  5. os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"

  6. os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"

  7. # 创建SparkConf类对象

  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  9. # 基于SparkConf类对象创建SparkContext对象

  10. sc = SparkContext(conf=conf)

  11. # 需求:通过map方法将全部数据都乘以10

  12. rdd = sc.parallelize([1, 2, 3, 4, 5])

  13. # 把rdd中的每一个数据都调用匿名函数(也可通过def重新定义函数)去处理

  14. rdd2 = rdd.map(lambda x: x * 10)

  15. print(rdd2.collect())

  16. # 停止SparkContext对象的运行(停止pyspark程序)

  17. sc.stop()

(2)flatMap方法

功能:对RDD执行map操作,然后进行解除嵌套操作

  1. # 嵌套的list

  2. list = [[1,2,3],[4,5,6],[7,8,9]]

  3. # 解除了嵌套

  4. list = [1,2,3,4,5,6,7,8,9]

示例代码:

  1. import sys

  2. from pyspark import SparkConf, SparkContext

  3. import os

  4. # 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境

  5. os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"

  6. os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"

  7. # 创建SparkConf类对象

  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  9. # 基于SparkConf类对象创建SparkContext对象

  10. sc = SparkContext(conf=conf)

  11. rdd = sc.parallelize(["hello python", "hello word", "hello friend"])

  12. # 需求:将RDD中的每一个单词都提取出来

  13. rdd2 = rdd.flatMap(lambda x: x.split(" "))

  14. print(rdd2.collect())

(3)reduceByKey方法

功能:针对KV型RDD(二元元组),自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

语法:

  1. rdd.reduceByKey(func)

  2. # func:(V,V)-> V

  3. # 接受2个传入参数(类型要一致),返回一个返回值,类型和传入要求一致。

示例代码:

  1. # 前面创建RDD对象不做赘述了

  2. # 准备一个RDD

  3. rdd = sc.parallelize([('男', 99), ('男', 80), ('男', 70), ('女', 100), ('女', 85)])

  4. # 求男生和女生两个组的成绩之和

  5. rdd2 = rdd.reduceByKey(lambda a, b: a + b)

  6. print(rdd2.collect())

案例1

对下面txt文件中的单词进行计数统计 

代码如下:

  1. import sys

  2. from pyspark import SparkConf, SparkContext

  3. import os

  4. # 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境

  5. os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"

  6. os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"

  7. # 创建SparkConf类对象

  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  9. # 基于SparkConf类对象创建SparkContext对象

  10. sc = SparkContext(conf=conf)

  11. # 读取数据文件

  12. rdd = sc.textFile("G:/hello.txt")

  13. # 读取全部单词

  14. word_rdd = rdd.flatMap(lambda x: x.split(" "))

  15. # 将所有单词都转换成二元元组,单词为key,value设置为1

  16. word_one_rdd = word_rdd.map(lambda word: (word, 1))

  17. # 分组并求和传承传承

  18. result_rdd = word_one_rdd.reduceByKey(lambda a, b: a + b)

  19. print(result_rdd.collect())

(4)filter方法

功能:过滤想要的数据进行保留

语法:

  1. rdd.filter(func)

  2. # func: (T)-->bool 传入1个参数进来随意类型,返回值必须是true or false

示例代码:

  1. # 前面创建RDD对象不做赘述了

  2. # 准备一个RDD

  3. rdd = sc.parallelize([1, 2, 3, 4, 5])

  4. # 对RDD的数据进行过滤,偶数返回true,保留偶数

  5. rdd2 = rdd.filter(lambda num: num % 2 == 0)

  6. print(rdd2.collect())

(5)distinct方法

功能:对RDD数据进行去重,返回新RDD

语法:rdd.distinct()

实例代码:

  1. # 前面创建RDD对象不做赘述了

  2. # 准备一个RDD

  3. rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 4, 5, 6, 7, 7, 8, 9])

  4. # 对RDD的数据进行去重

  5. rdd2 = rdd.distinct()

  6. print(rdd2.collect())

(6)stortBy方法

功能:对RDD数据进行排序,基于你指定的排序依据

语法:

  1. rdd.sortBy(func,ascending=False, numPartitions=1)

  2. # func: (T) -> U: 告知按照rdd中的哪个数据进行排序,比如 lambda x: [1] 表按照rdd中的第二列元素进行排序

  3. # ascending True升序 False 降序

  4. # numPartitions: 用多少分区排序

示例代码:

以上对单词进行计数统计的案例1中,对结果按照单词出现的次数从大到小进行排序

  1. # 对结果进行排序

  2. final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)

  3. print(final_rdd.collect())

案例2

需求:对以下文件使用spark读取文件进行计算:

(1)各个城市销售额排名,从大到小

(2)全部城市,有哪些商品类别在售卖

(3)北京市有哪些商品类别在售卖

案例代码:

  1. import sys

  2. from pyspark import SparkConf, SparkContext

  3. import os

  4. import json

  5. # 添加'PYSPARK_PYTHON'和'PYSPARK_DRIVER_PYTHON'的执行环境

  6. os.environ['PYSPARK_PYTHON'] = sys.executable # "E:/python/python.exe"

  7. os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # "E:/python/python.exe"

  8. # 创建SparkConf类对象

  9. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app") # 链式调用,不管调用什么样的方法,返回的都是同一个对象

  10. # 基于SparkConf类对象创建SparkContext对象

  11. sc = SparkContext(conf=conf)

  12. # (1)城市销售额排名

  13. # 读取数据文件

  14. file_rdd = sc.textFile("G:/orders.txt")

  15. # 读取文件中单个json字符串

  16. json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))

  17. # 将单个字符串转换为字典

  18. dict_rdd = json_str_rdd.map(lambda x: json.loads(x))

  19. # 取出城市和销售额数据(城市,销售额)

  20. city_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))

  21. # 按照城市分组,按照销售额累计

  22. city_result_rdd = city_money_rdd.reduceByKey(lambda a, b: a + b)

  23. # 按照销售额累计结果进行排序

  24. result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)

  25. print("各个城市销售额排名,从大到小:", result1_rdd.collect())

  26. # (2)全部城市,有哪些商品类别在售卖

  27. category_rdd = dict_rdd.map(lambda x: x['category']).distinct()

  28. print("全部城市,有哪些商品类别在售卖:", category_rdd.collect())

  29. # (3)北京市有哪些商品类别在售卖

  30. beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')

  31. beijing_category = beijing_data_rdd.map(lambda x: x['category']).distinct()

  32. print('北京市有哪些商品类别在售卖:', beijing_category.collect())

数据输出

RDD的结果输出为Python对象的各类方法

(1)collect方法

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象

语法:rdd.collect(),返回值是一个list

此方法我们前面一直在使用,不做赘述

(2)reduce方法

功能:对RDD数据集按照你传入的逻辑进行聚合

语法:rdd.reduce(func),两参数传入,1个返回值,返回值和参数要求类型一致

示例代码:

  1. rdd = sc.parallelize([1, 2, 3, 4, 5])

  2. print(rdd.reduce(lambda a, b: a + b))

  3. # 输出结果为两两相加的值:15

(3)take方法

功能:取RDD的前n个元素,组成list返回

语法:rdd.take(num),num代表前几个元素

示例代码:

  1. rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

  2. print(rdd.take(5))

  3. # 输出结果为:[1, 2, 3, 4, 5]

(4)count方法

功能:计算RDD有多少条数据,返回值是一个数字

用法:rdd.count()

示例代码:

  1. rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

  2. print(rdd.count())

  3. # 输出结果为:10

注意:

关于Spark的方法(算子)还有很多很多,我们目前不是深入学习python语言,同时也没有学习分布式,只是简单学习对自动化测试打下基础,所以本篇文章只介绍了最基础的几个。

将RDD的内容输出到文件中

(5)saveAsTextFile方法

功能:将RDD的数据写入文本文件中,支持本地写出,hdfs(Hadoop分布式文件系统)等文件系统。

语法: rdd.saveAsTextFile("../data/output/test.txt")

想要这个方法正常运行,还需要配置Hadoop依赖,自行百度配置

运行之后,内容会存在多个分区中,输出的结果是一个文件夹,有几个分区就输出多少个结果文件

修改RDD分区为1个

方式1:SparkConf对象设置conf.set("spark.default.parallelism", "1")

方式2:创建RDD的时候,sc.parallelize方法传入numSlices参数为1

  1. # 方式1,SparkConf对象设置属性全局并行度为1:

  2. conf = SparkConf().setMaster("Tocal[*]").setAppName("test_spark")

  3. conf.set("spark.default.parallelism", "1")

  4. sc = SparkContext(conf=conf)

  5. # 方式2, 创建RDD的时候设置(parallelize方法传入numSlices参数为1)

  6. rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)

  7. rdd1 = sc.parallelize([1, 2, 3, 4, 5], 1)

行动吧,在路上总比一直观望的要好,未来的你肯定会感 谢现在拼搏的自己!如果想学习提升找不到资料,没人答疑解惑时,请及时加入扣群: 320231853,里面有各种软件测试+开发资料和技术可以一起交流学习哦。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/686151.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[NISACTF 2022]popchains

第一步:看到 include($value); 作为链尾,则要触发 append($value) -->>__invoke(),看到$function()。 __invoke():对象以函数形式被调用时触发 第二步:$function() ,则要触发 __get($key)&#xff0…

【电子商务设计师】2024年5月考试答题技巧与注意事项

电子商务设计师答题技巧: 1、综合知识 (1)首先是分析试题的技巧 --先看清楚问题,再看选项; --判断题目到底考察的是什么知识点,排除干扰项。 (2)掌握答题的技巧 --题目往往会选…

Java入门基础学习笔记13——数据类型

数据类型的分类: 基本数据类型 引用数据类型 基本数据类型:4大类8种类型: 定义整形用int,再大的数用long。 package cn.ensource.variable;public class VariableDemo2 {public static void main(String[] args) {//目标&#x…

文章分享:《肿瘤DNA甲基化标志物检测及临床应用专家共识(2024版)》

本文摘自于《肿瘤DNA甲基化标志物检测及临床应用专家共识(2024版)》 目录 1. DNA甲基化标志物概述 2 DNA甲基化标志物的临床检测 2.1 临床样本前处理注意事项 2.2 DNA甲基化标志物检测技术方法 2.2.1 DNA提取与纯化 2.2.2 DNA转化 2.2.3 DNA 甲基…

中国地面基本气象逐小时数据获取方式

环境气象数据服务平台提供了全国大约2100个点位,2023年1月1日至今的小时级数据。包括气温、气压、湿度、风、降水等要素。 数据基于ECMWF ERA5-Land Hourly陆面再分析资料和中国地面基本气象观测逐三小时数据,使用机器学习模型加工所得,对比…

Hive Windows Functions 窗口函数

Hive Windows Functions 窗口函数 在 Hive 中,窗口函数(Window Functions)用于在查询结果中执行聚合、排序和分析操作,而无需将数据分组。窗口函数允许你在查询结果中的一组行上执行计算,而不会改变原始数据的行数&am…

基于SSM的文化遗产的保护与旅游开发系统(有报告)。Javaee项目。ssm项目。

演示视频: 基于SSM的文化遗产的保护与旅游开发系统(有报告)。Javaee项目。ssm项目。 项目介绍: 采用M(model)V(view)C(controller)三层体系结构,…

【typescript测试 - Jest 配置与使用】

安装 npm install --save-dev types/jestnpm install --save-dev ts-jest配置 tsconfig.json {"compilerOptions": {"types": ["jest"]} }jest.config.js module.exports {preset: ts-jest,testEnvironment: node, };使用 // add.js funct…

企业数据有什么价值?

在当下的数字经济时代,数据已上升为国家重要的基础性战略资源,加快建设数字中国、网络强国这一蓝图的实现,离不开数据要素的支撑。数据作为新型生产要素,具有非消耗性、非竞争性等特征,为突破传统生产要素的增长约束提…

Jetpack Compose一:初步了解Compose

Intellij IDEA构建Android开发环境 IntelliJ IDEA 2023.2.1 Android开发变化 IDEA配置使用Gradle 新建Compose工程,取名ComposeStudy 可以看到的是IDEA为项目初始化了部分代码 使用Compose开发不再需要使用xml文件来设计布局了 Compose中的Text也不同于Android V…

会话劫持攻击就在我们身边,我们要如何防范

会话劫持攻击(Session Hijacking)是一种网络攻击方式,攻击者通过某种手段获取到用户的会话标识(Session ID),然后使用这个会话标识冒充合法用户进行恶意操作。这种攻击方式允许攻击者以合法用户的身份访问受…

基于SpringBoot+Vue点餐系统设计和实现(源码+LW+部署讲解)

🌹作者简介:✌全网粉丝10W,前大厂员工,多篇互联网电商推荐系统专利,现有多家创业公司,致力于建站、运营、SEO、网赚等赛道。也是csdn特邀作者、博客专家、Java领域优质创作者,博客之星、掘金/华…