Spark---RDD持久化

文章目录

  • 1.RDD持久化
      • 1.1 RDD Cache 缓存
      • 1.2 RDD CheckPoint 检查点
      • 1.3 缓存和检查点区别
  • 2.RDD分区器
      • 2.1 Hash 分区:
      • 2.2 Range 分区:
      • 2.3 用户自定义分区

1.RDD持久化

在Spark中,持久化是将RDD存储在内存中,以便在多次计算之间重复使用。这可以显著减少不必要的计算,提高Spark应用程序的性能。
在这里插入图片描述

    val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//执行扁平化操作//扁平化就是将多个集合打散为一个集合val words = lines.flatMap((a: String) => a.split(" "))//对每个单词进行改造(hello,1)val wordMap = words.map(word => {println("@@@@@@@@@@@@@@@@@")(word, 1)})//reduceByKey要使用wordMapval wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)wordToCount.collect().foreach(println)//groupByKey要使用wordMapval wordToGroup = wordMap.groupByKey()wordToGroup.collect().foreach(println)

在这里插入图片描述

如上述代码,reduceByKey和groupByKey都要是用wordMap的结果,由于RDD中是不存储数据的,在reduceByKey使用完成之后,groupByKey想要再次使用的时候,需要查找血缘关系,从头开始一步一步的执行,如果数据量大的情况下,会造成很大的成本上的浪费。

1.1 RDD Cache 缓存

RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。 但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

可以通过设置persist来将数据存储到内存或者磁盘中

    // 数据缓存。wordMap.cache()// 可以更改存储级别//wordMap.persist(StorageLevel.MEMORY_AND_DISK_2)

在这里插入图片描述
可以看出将中间结果放入缓存中后,第二次使用中间结果的时候,将不会从头再执行一遍,而是直接从缓存中读取数据,

存储级别:

object StorageLevel {val NONE = new StorageLevel(false, false, false, false)val DISK_ONLY = new StorageLevel(true, false, false, false)val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY = new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

在这里插入图片描述
Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist 或 cache。

1.2 RDD CheckPoint 检查点

检查点其实就是通过将 RDD 中间结果写入磁盘
由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

检查点的数据需要进行落盘,当作业执行结束后,不会被删除。检查点数据一般都是存储在hdfs上。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")//配置文件val context = new SparkContext(wordCount)//读取配置文件// 设置检查点路径//这里选择将数据落盘在本地context.setCheckpointDir("./checkpoint1")//执行业务操作val lines = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//执行扁平化操作//扁平化就是将多个集合打散为一个集合val words = lines.flatMap((a: String) => a.split(" "))//对每个单词进行改造(hello,1)val wordMap = words.map(word => {println("@@@@@@@@@@@@@@@@@")(word, 1)})// 数据缓存。wordMap.cache()// 数据检查点:针对 wordToOneRdd 做检查点计算wordMap.checkpoint()//reduceByKey要使用wordMapval wordToCount=wordMap.reduceByKey((t1,t2)=>t1+t2)wordToCount.collect().foreach(println)//groupByKey要使用wordMapval wordToGroup = wordMap.groupByKey()wordToGroup.collect().foreach(println)//关闭连接context.stop()

1.3 缓存和检查点区别

1)Cache 缓存只是将数据保存起来,不切断血缘依赖,即Cache相当于增加一个依赖关系。Checkpoint 检查点切断血缘依赖。
2)Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存
储在 HDFS 等容错、高可用的文件系统,可靠性高。
3)建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存
中读取数据即可,否则需要再从头计算一次 RDD。

2.RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
每个 RDD 的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

val dataRdd=sparkRdd.makeRDD(List(1,2,3,4),2)

在上述代码中创建的dataRDD是没有分区器的,但是Spark会根据指定的分区数(在这个例子中是2)来将数据分布到不同的分区中,但这种分布并不是由分区器控制的。而是根据数据的本地性(locality)或者简单的轮询(round-robin)方式来分配的。

2.1 Hash 分区:

对于给定的 key,计算其 hashCode,并除以分区个数取余

2.2 Range 分区:

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

2.3 用户自定义分区

自定义分区器,实现将王同学分到一个分区,张同学分到另一个分区

package bigdata.wordcount.RDDimport org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkConf, SparkContext}/*** 自定义分区器*/
object SelfPartitioner {def main(args: Array[String]): Unit = {//建立与Spark框架的连接val rdd = new SparkConf().setMaster("local[*]").setAppName("RDD") //配置文件val sparkRdd = new SparkContext(rdd) //读取配置文件val dataRDD: RDD[(String, String)] = sparkRdd.makeRDD(List(("小王", "xxxxxxxxx"),("小张", "xxxxxxxxx"),("小王", "xxxxxxxxx"),), 2)//采用自定义分区器val dataRes: RDD[(String, String)] = dataRDD.partitionBy(new MyPartitioner)dataRes.saveAsTextFile("output0")sparkRdd.stop()}}/*** 自定义分区器* 1.继承Partitioner* 2.重写相关方法*/
class MyPartitioner extends Partitioner
{//设置分区数量override def numPartitions: Int = 2//根据得到的key值获取到分区的索引(从0开始)override def getPartition(key: Any): Int = {key match {case "小王" => 0case "小张" => 1}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/345773.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

浅谈6种流行的API架构风格

前言 API在现代软件开发中扮演着重要的角色,它们是不同应用程序之间的桥梁。编写业务API是日常开发工作中最常见的一部分,选择合适的API框架对项目的成功起到了至关重要的作用。本篇文章将浅谈一下当前6种流行的API架构风格的优点、缺点以及适用场景。 6…

第22集《佛法修学概要》

请大家打开讲义第六十一页。 我们这一科讲到归敬三宝。前面讲到,我们在心中能够受持“常住”两个字,就能够远离三恶道。“常住”是针对生灭来说的,我们的心是没有常住的。凡夫的心深受感受的刺激,一接触外境就带动感受&#xff0…

MySQL的各种日志

目录 一、错误日志 二、二进制日志 1、介绍 2、作用 3、相关信息 4、日志格式 5、查看二进制文件 6、二进制日志文件删除 三、查询日志 四、慢日志 一、错误日志 记录MySQL在启动和停止时,以及服务器运行过程中发生的严重错误的相关信息,当数据库…

09-Python服务链路追踪案例

skyWalking Python agent requires SkyWalking 8.0 and Python 3.7 # 将django包导入 ~$ cd /apps ~$ tar xf django-test.tgz ~$ cd django-test# 安装模块 ~$ apt install python3-pip ~$ pip3 install -r requirements.txt# 创建django项目mysite ~$ django-admin startpro…

【电源专题】案例:在EN脚加个电阻就能解决电源下电输出振荡?

案例背景:在某产品上使用一颗升压芯片发现下电输出波形振荡,但此产品并不是第一个使用此升压芯片的。早先此升压芯片使用在其他产品上没有报过这个异常。 分析方法:使用DEMO板,查看标准DEMO板无异常。将异常板卡上的参数与全部换到DEMO板上发现同样存在异常。 推测原因:…

2024年该如何招聘科技人员

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版,欢迎购买。点击进入详情 过去几年科技领域发生了令人难以置信的动荡。我可以有把握地说,今天的就业市场比 2000 年代我第一次成为开发人员时更具挑战性。人工智能的繁荣与前所…

用TF-IDF处理文本数据

计算机擅长处理数字,但不擅长处理文本数据,TF-IDF是处理文本数据最广泛使用的技术之一,本文对它的工作原理以及它的特性进行介绍。 根据直觉,我们认为在文本数据分析中出现频率更高的单词应该具有更大的权重,但事实并…

JavaScript数组全攻略

🧑‍🎓 个人主页:《爱蹦跶的大A阿》 🔥当前正在更新专栏:《VUE》 、《JavaScript保姆级教程》、《krpano》 ​ ​ 目录 ✨ 前言 数组的定义 创建数组 1. 数组字面量 2. Array构造函数 3. Array.of() 4. Arra…

Unity摇杆+键鼠控制位移、旋转

1、位移 首先我们找到两张图片,一个大圆一个小圆,像这样: 结构是这样的: 然后,新建一个场景,用胶囊去做玩家,摄像机在胶囊下,并且在场景中放两个cube作为参照物 像这样搭好后&#…

STM32——OLED实验

1.OLED简介 OLED,即有机发光二极管 OLED引脚说明 引脚说明: 1、CS:OLED片选信号(低电平有效) 2、WR:向OLED写入数据 3、RD:向OLED读取数据 4、D[7:0]:8位双向数据线,有…

API Monitor简易使用教程 监控Windows dll调用 监控Windows API调用 查看函数名,参数类型,参数,返回值

先看效果,可以显示所有dll及windows api的指定函数调用,以及传递的参数查看与修改。 官网下载 也有教程 我验证使用方法 1、API Filter窗口:选定要监听的dll函数或windows API,可以打断点 选中并右键勾上Breakpoint 选 Before C…

线程安全--互斥锁

文章目录 一.线程安全问题读取无效(脏)数据丢失更新线程安全的保证--操作的原子性 二.互斥锁及其实现原理互斥锁的实现原理pthread线程库提供的锁操作 三.死锁问题 一.线程安全问题 当多个线程并发地对同一个共享资源进行修改操作时,可能会引发数据读写错误(比如读取无效(脏)数…