RDD算子介绍

1. RDD算子

RDD算子也叫RDD方法,主要分为两大类:转换和行动。转换,即一个RDD转换为另一个RDD,是功能的转换与补充,比如map,flatMap。行动,则是触发任务的执行,比如collect。所谓算子(Operator),就是通过操作改变问题的状态(来源于认知心理学)。RDD算子有Value类型,双Value类型和Key-Value类型。

2. map

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.map(num=>num*2)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.textFile("data")
val mapRDD : RDD[String] = rdd.map(line => {val datas = line.split(" ")datas(3)
})
mapRDD.collect().foreach(println)

为观察map阶段的分区并行计算过程,添加如下打印

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

看不出什么规律,改为1个分区:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 1)
val mapRDD1 : RDD[Int] = rdd.map(num => {println(">>>>>>>>")num
})
val mapRDD2 : RDD[Int] = rdd.map(num => {println("######")num
})
mapRDD2.collect().foreach(println)

结果如下:

 

所以,RDD的计算对于分区内的数据是一个个执行的,即分区内数据的执行是有序的,但是分区间的数据执行是无序的。

3. mapPartitions

上述的map算子对于分区内的数据是一个个依次进行操作,可能存在性能问题,而mapPartitions算子是对于整个分区的数据整体进行操作,但是可能会占用大量空间(以空间换时间)。mapPartitions的参数是iter=>iter。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>>")iter.map(_*2)
})
mapRDD.collect().foreach(println)

结果如下:

 

因为只有两个分区,所以打印两次">>>>>>>>"。使用mapPartitions获取每个分区的最大值:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitions(iter => {List(iter.max).iterator
})
mapRDD.collect().foreach(println)

这个功能是map算子所实现不了的,因为map算子并不能感知数据来源于分区,而mapPartitions可以以分区为单位进行数据处理(批处理操作)。

4. mapPatitionsWithIndex

mapPartitions虽然以分区为单位进行数据批处理,但是其实也感知不到分区是哪个分区,在一些需要知道分区号的场景下,需要用到mapPatitionsWithIndex。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {if (index == 1) {iter} else {Nil.iterator}
})
mapRDD.collect().foreach(println)

上述代码实现了保留第二个(索引为1)分区,结果如下:

 

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD : RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {iter.map(num => (index, num))
})
mapRDD.collect().foreach(println)

上述代码实现了查看每个数据在哪个分区,结果如下:

5. flatMap

flatMap做扁平化映射

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
val mapRDD : RDD[Int] = rdd.flatMap(list => list)
mapRDD.collect().foreach(println)
val rdd : RDD[String] = sc.makeRDD(List("Hello Spark", "Hello Scala"))
val mapRDD : RDD[String] = rdd.flatMap(s => s.split(" "))
mapRDD.collect().foreach(println)

结果如下:

 

将List(List(1,2), 3, List(4,5))进行扁平化操作(使用模式匹配):

val rdd : RDD[List[Int]] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
val mapRDD : RDD[Int] = rdd.flatMap(data => {data match {case list:List[] => listcase num => List(num)}
})
mapRDD.collect().foreach(println)

6. glom

glom操作有点类似于flatMap的逆操作,将分区内的数据转换为相同类型的内存数组,分区不变。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>data.mkString(","))

结果如下:

 

求各分区最大值之和:

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD : RDD[Array[Int]] = rdd.glom()
val maxRDD : RDD[Int] = glomRDD.map(array => array.max)
println(maxRDD.collect().sum))

7. groupBy

按照指定的key进行分组

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val groupRDD : RDD[(Int, Iterable[Int])] = rdd.groupBy(num => num % 2)
groupRDD.collect().foreach(println)

结果如下:

 

按照首字母分组

val rdd : RDD[String] = sc.makeRDD(List("Hello", "Spark", "Scala", "Hadoop"), 2)
val groupRDD = rdd.groupBy(s => s.charAt(0))
groupRDD.collect().foreach(println)

结果如下:

 

分组的过程可能会打乱数据,即数据可能会重新组合,原分区的数据被分到另一个分区了,即shuffle过程。极限情况下,数据可能被分到一个分区中。一个组的数据在一个分区中,但是一个分区不一定只有一个组。

8. filter

过滤偶数

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val filterRDD : RDD[Int] = rdd.filter(num % 2 == 0)
filterRDD.collect().foreach(println)

按照指定规则进行数据过滤,分区不变,过滤后,不同分区内的数据可能不均衡,即数据倾斜。 

过滤指定日期的数据:

val rdd : RDD[String] = sc.textFile("data")
val filterRDD : RDD[String] = rdd.filter(line => {val datas = line.split(" ")datas(3).startWith("17/05/2015")
})
filterRDD.collect().foreach(println)

9. sample

采样/抽取数据,用的一般不多,其中一个用途可能是解决数据倾斜问题。sample算子主要有三个参数,第一个是抽取的数放不放回去,第二个参数是概率,如果抽取不放回,则表示每个数被抽取的概率,如果抽取放回,则表示某个数可能的抽取次数(可能的次数而已),第三个参数是随机数算法种子(一般可不填,如果填了,可能会导致抽取结果固定)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4, 1)
println(sampleRDD.collect().mkstring(","))

 结果如下:

多运行几次,发现结果不变,因为随机数算法种子固定了,如果不传,则默认使用系统时间(变化的)。

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(false, 0.4)
println(sampleRDD.collect().mkstring(","))

此时结果就不固定,结果都不一定为4个数。

根据源码,如果抽取不放回,抽取算法为伯努利分布,如果抽取放回,则为泊松分布。

如果抽取放回,

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val sampleRDD : RDD[Int] = rdd.sample(true, 2)
println(sampleRDD.collect().mkstring(","))

 结果如下:

10. distinct

distinct算子用于去重

val rdd : RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val distinctRDD : RDD[Int] = rdd.distinct()
distinctRDD.collect().foreach(println)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/504627.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python自动化之项目架构搭建与思路讲解(第二天)

1.自动化测试的概念 自动化测试是指使用自动化工具和脚本来执行测试任务,以验证软件或系统的正确性和稳定性。它可以提高测试的效率和准确性,并节约时间和成本。 2.自动化脚本编写的思路 xmind文档如有需要,可在资源里自行下载 3.项目代码工程创建 lib :基本代码库包 …

像用Excel一样用Python:pandasGUI

文章目录 启动数据导入绘图 启动 众所周知,pandas是Python中著名的数据挖掘模块,以处理表格数据著称,并且具备一定的可视化能力。而pandasGUI则为pandas打造了一个友好的交互窗口,有了这个,就可以像使用Excel一样使用…

基于SpringBoot多模块项目引入其他模块时@Autowired无法注入

基于SpringBoot多模块项目引入其他模块时Autowired无法注入 一、问题描述1、解决方案 一、问题描述 启动Spring Boot项目时报 Could not autowire. No beans of ‘xxxxxxxx’ type found. 没有找到bean的实例,即spring没有实例化对象,也就无法根据配置文…

AcWing 788. 逆序对的数量 解题思路及代码

先贴个题目: 以及原题链接: 788. 逆序对的数量 - AcWing题库https://www.acwing.com/problem/content/790/ 这题也是板子题,就是对归并排序的衍生,我们先分析下如果用归并排序对排序区间进行二分的话,逆序对可能出现的…

回溯是怎么回事(算法村第十八关青铜挑战)

组合 77. 组合 - 力扣(LeetCode) 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],…

3.3 序列式容器-deque、stack、queue、heap、priority_queue

deque 3.1定义 std::deque(双端队列)是C标准模板库(STL)中的一种容器,表示双端队列数据结构。它提供了在两端高效地进行插入和删除操作的能力。与vector的连续线性空间类似,但有所不同,deque动…

【嵌入式——QT】QListWidget

QListWidget类提供了一个基于项的列表小部件,QListWidgetItem是列表中的项,该篇文章中涉及到的功能有添加列表项,插入列表项,删除列表项,清空列表,向上移动列表项,向下移动列表项。 常用API a…

python接口自动化(一)--什么是接口、接口优势、类型(详解)

简介 经常听别人说接口测试,接口测试自动化,但是你对接口,有多少了解和认识,知道什么是接口吗?它是用来做什么的,测试时候要注意什么?坦白的说,笔者之前也不是很清楚。接下来先看一下…

鬼屋游戏c++

c #include <iostream> #include <string> #include <vector> #include <cstdlib> // 用于随机数生成 #include <ctime> // 用于随机数种子using namespace std;// 定义房间结构体 struct Room {string description;bool hasKey;bool hasClue…

基于yolov5的电瓶车和自行车检测系统,可进行图像目标检测,也可进行视屏和摄像检测(pytorch框架)【python源码+UI界面+功能源码详解】

功能演示&#xff1a; 基于yolov5的电瓶车和自行车检测系统_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov5的电瓶车和自行车检测系统是在pytorch框架下实现的&#xff0c;这是一个完整的项目&#xff0c;包括代码&#xff0c;数据集&#xff0c;训练好的模型…

YOLOv9:Learning What You Want to Learn Using Programmable Gradient Information

YOLOv9&#xff1a;Learning What You Want to Learn Using Programmable Gradient Information 摘要 今天的深度学习方法关注的是如何设计最合适的目标函数&#xff0c;使模型的预测结果最接近ground truth的真实情况。同时&#xff0c;必须设计一个适当的体系结构&#xff…

WebServer -- 注册登录

目录 &#x1f349;整体内容 &#x1f33c;流程图 &#x1f382;载入数据库表 提取用户名和密码 &#x1f6a9;同步线程登录注册 补充解释 代码 &#x1f618;页面跳转 补充解释 代码 &#x1f349;整体内容 概述 TinyWebServer 中&#xff0c;使用数据库连接池实现…