Spark编程-键值对RDD(K,V)创建及常用操作

简述

        SparkRDD中可以包含任何类型的对象,在实际应用中,“键值对”是一种比较常见的RDD元素类型,分组和聚合操作中经常会用到,尤其是groupByKey和reduceByKey。
        Spark操作中经常会用到“键值对RDD”(Pair RDD),用于完成聚合计算。普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。

生产环境用到的操作

        以下为我在生产环境用到的操作

WordCount:

        统计文本中每个单词出现的次数,使用Pair RDD将每个单词作为键,将出现次数作为值,然后进行reduceByKey操作进行聚合。

分组聚合:

        将具有相同键的元素分组在一起,并对每个键的值进行聚合操作,如groupByKey、reduceByKey等。

数据连接和关联:

        使用键值对进行数据的连接和关联操作,如join、cogroup等。

数据预处理:

        对数据进行分组、排序、过滤等预处理操作,如groupBy、sortByKey、filter等。

数据分析和统计:

        使用Pair RDD进行数据分析和统计操作,如计算平均值、求和、最大值、最小值等。 通过Pair RDD,可以更方便地处理键值对数据,实现更灵活和复杂的数据处理和分析需求。

Pair RDD的创建方式

第一种:从文件中加载数据创建pairRDD

//测试数据,自己编的,文件名为personID
591,2021,15448329898,北京,彩信
592,2022,15648029823,河北,微信
593,2022,16742329894,山西,电话
594,2020,17748529893,海南,微信
595,2020,19048729896,大连,QQ

代码及运行结果

scala> val lines = sc.textFile("file:///data/testdata/personID.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///data/testdata/personID.txt MapPartitionsR                                    DD[1] at textFile at <console>:23scala> val pairRDD = lines.flatMap(elem => (elem + 1))
pairRDD: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[2] at flatMap at <console>:23scala> val pairRDD = lines.flatMap(line => line.split(",")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:                                    23scala> pairRDD.foreach(println)
(591,1)0:>                                                          (0 + 1) / 1]
(2023,1)
(15448329898,1)
(北京,1)
(彩信,1)
(592,1)
......

从代码执行结果来看:

 返回的结果是键值对类型的RDD,即RDD[(String, Int)]。从pairRDD.foreach(println)执行的打印输出结果也可以看到,都是由(单词,1)这种形式的键值对。

第二种:通过数组Array或集合List创建pairRDD

案例:

//使用array数组
scala> val array = Array("spark", "hadoop", "flink", "hive")
array: Array[String] = Array(spark, hadoop, flink, hive)
scala> val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> val pairRDD = rdd.map(word =>(word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:23
scala> pairRDD.foreach(println)
(spark,1)
(hadoop,1)
(flink,1)
(hive,1)//使用list集合
scala> val list = List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:23              ^scala> pairRDD.foreach(println)
(hadoop,1)
(spark,1)
(hive,1)

常用键值对转换操作

常用的键值对转换操作包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等

reduceByKey(func)

功能:使用func函数合并具有相同键的值。注意,这里强调合并相同键。

比如,reduceByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

对具有相同key的键值对进行合并后的结果就是:

        (spark,1)
        (hadoop,2)
        (nlp,3)
我们对上面第二种方式创建List集合得到的pairRDD进行reduceByKey()操作,代码如下:

scala> val list = List("nlp","nlp","spark","nlp","hadoop","hadoop")
list: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:23scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala> pairRDD.reduceByKey((a,b) => a + b).foreach(println)
(spark,1)
(hadoop,2)
(nlp,3)

groupByKey()

功能:对具有相同键的值进行分组。注意,这里强调对相同的键分成一组。

比如,groupByKey((a,b) => a+b),有五个键值对(nlp,1)
        (nlp,1)
        (spark,1)
        (nlp,1)
        (hadoop,1)
        (hadoop,1)

我们对上面第二种方式创建得到的pairRDD进行groupByKey()操作,代码如下:

scala> pairRDD.groupByKey()
res17: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at <console>:24
// 分组后,value被保存到Iterable[Int]中scala> pairRDD.groupByKey().foreach(println)
(spark,CompactBuffer(1))
(hadoop,CompactBuffer(1, 1))
(nlp,CompactBuffer(1, 1, 1))

keys

功能:会把键值对RDD中的key返回形成一个新的RDD。

scala> pairRDD.keys
res20: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at keys at <console>:24scala> pairRDD.keys.foreach(println)
nlp
nlp
spark
nlp
hadoop
hadoop

可以对返回的key的集合进行操作,比如说写入一个List集合中

scala> val prirRDDkeysList = pairRDD.keys.collect().toList
prirRDDkeysList: List[String] = List(nlp, nlp, spark, nlp, hadoop, hadoop)scala> val prirRDDkeysArray = pairRDD.keys.collect()
prirRDDkeysArray: Array[String] = Array(nlp, nlp, spark, nlp, hadoop, hadoop)

values

功能: 把键值对RDD中的value返回形成一个新的RDD。

scala> pairRDD.foreach(println)
(nlp,1)
(nlp,1)
(spark,1)
(nlp,1)
(hadoop,1)
(hadoop,1)scala> pairRDD.values.foreach(println)
1
1
1
1
1
1

        将得到的值保存到数组或集合中

scala> val prirRDDValuesList = pairRDD.values.collect().toList
prirRDDValuesList: List[Int] = List(1, 1, 1, 1, 1, 1)scala> val prirRDDValueArray = pairRDD.values.collect()
prirRDDValueArray: Array[Int] = Array(1, 1, 1, 1, 1, 1)

注意

        为什么会报错value collect is not a member of Unit ,因为foreach方法返回的是Unit类型,它没有collect方法。

scala> val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList 
:26: error: value collect is not a member of Unit 
val prirRDDValuesList = pairRDD.values.foreach(println).collect().toList

工作中使用collect()导致的内存不足调优:

        当处理大数据集时,可以考虑使用Spark的分布式计算能力来处理数据,而不是将所有数据收集到驱动程序中。这样可以避免内存不足的问题。

        我使用collect方法将这个RDD中的元素收集到驱动程序,并返回一个数组。如果pairRDD中的数据量很大,collect操作可能会导致内存不足的问题,建议在处理大数据集时,谨慎使用collect方法。我们可以用很多方法来避免:

  1. 使用RDD转换操作:可以使用各种RDD转换操作,如mapfilterreduceByKey等,对数据集进行转换和聚合操作。这些操作在分布式环境下进行,可以利用集群中的多个节点进行计算。
  2. 使用RDD的collecttake方法:如果只需要获取部分数据,可以使用collect方法将数据收集到驱动程序中,确保数据量不会导致内存不足,可以使用take方法获取RDD中的前几个元素。
  3. 使用RDD的sample方法:可以使用sample方法对数据进行采样,从而获取数据集的一个子集。这样可以在处理大数据集时降低计算和内存的压力。
  4. 使用Spark SQL或DataFrame:如果数据集结构化且存储在支持Spark SQL的数据源中,可以使用Spark SQL或DataFrame API进行数据操作和分析。这些API提供了更高级的数据操作和查询功能。
  5. 使用持久化存储:如果需要将处理结果保存下来或供其他程序使用,可以将结果存储在持久化存储系统中,如HDFS或数据库。这样可以避免将所有数据收集到驱动程序中。 利用集群中的计算资源进行并行计算,避免将所有数据收集到驱动程序中,可以使用RDD转换操作、采样、分页获取等技术来处理数据。

sortByKey()

功能:是返回一个根据键排序的RDD。

scala> pairRDD.sortByKey().foreach(println)
(hadoop,1)
(hadoop,1)
(nlp,1)
(nlp,1)
(nlp,1)
(spark,1)

mapValues(func)  (常用)

功能:对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化。

        即我只对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。例如,对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)构成的pairRDD,如果执行pairRDD.mapValues(x => x+1),就会得到一个新的键值对RDD,它包含下面四个键值对("spark",2)、("spark",3)、("hadoop",4)和("hadoop",6)。

scala> pairRDD.mapValues(a => a*2).foreach(println)
(nlp,2)
(nlp,2)
(spark,2)
(nlp,2)
(hadoop,2)
(hadoop,2)

join  (常用)

功能:对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join的类型也和关系数据库中的join一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join就表示内连接。
        对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

        比如,pairRDD1是一个键值对集合{("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5)},pairRDD2是一个键值对集合{("spark","fast")},那么,pairRDD1.join(pairRDD2)的结果就是一个新的RDD,这个新的RDD是键值对集合{("spark",1,"fast"),("spark",2,"fast")}。

案例代码:

scala> val paRDD1 = sc.parallelize(Array(("spark",2),("hadoop",3),("spark",1),("hive",4),("hadoop",2)))
paRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:26scala> val paRDD2 = sc.parallelize(Array(("spark","nicetry"),("hadoop","good"),("spark",234),("hive",2314),("hadoop","ohho")))
paRDD2: org.apache.spark.rdd.RDD[(String, Any)] = ParallelCollectionRDD[29] at parallelize at <console>:26scala> paRDD1.join(paRDD2).foreach(println)
(spark,(2,nicetry))
(spark,(2,234))
(spark,(1,nicetry))
(spark,(1,234))
(hive,(4,2314))
(hadoop,(3,good))
(hadoop,(3,ohho))
(hadoop,(2,good))
(hadoop,(2,ohho))

    eg:现在来看林子雨教授讲解的是真清晰,温故而知新。

一个完整实例-计算每种图书的每天平均销量

思路

计算一天中各种类图书卖出去的平均值,键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量

步骤

1、构建数组,包含对应键值对,调用parallelize方法生成 RDD

2、针对构建得到的rdd,我们调用mapValues()函数,把rdd中的每个每个键值对(key,value)的value部分进行修改,把value转换成键值对(value,1),其中,数值1表示这个key在rdd中出现了1次,为什么要记录出现次数呢?因为,我们最终要计算每个key对应的平均值,所以,必须记住这个key出现了几次,最后用value的总和除以key的出现次数,就是这个key对应的平均值。

(注:collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。)

3、调用reduceByKey()函数,此处必须要十分准确地理解reduceByKey()函数的功能 => 合并具有相同键的值。

        reduceByKey(func)的功能是使用func函数合并具有相同键的值。这里的func函数就是Lamda表达式(x,y) => (x._1+y._1,x._2 + y._2),这个表达式中,x和y都是value,而且是具有相同key的两个键值对所对应的value。

4、 计算最终结果。对得到的几个键值对构成的RDD执行mapValues()操作,得到每种书的每天平均销量。mapValues,key不变,只对值记性操作。value会被赋值给Lamda表达式x => (x._1 / x._2中的x,x的值就是(22,2),x._1就是22,表示hadoop书总销量是22,x._2就是2,表示2天,因此,hadoop书籍的每天平均销量就是x._1 / x._2,也就是11。mapValues()输出的一个键值对就是("hadoop",11),其他同理。

代码

//构建书籍及销量
scala> val books = sc.parallelize(Array(("book1",5),("book2",10),("book3",8),("book1",6),("book2",12)))
books: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:23
// 统计
scala> val sum_books = books.mapValues(x => (x,1)).foreach(println)
(book1,(5,1))
(book2,(10,1))
(book3,(8,1))
(book1,(6,1))
(book2,(12,1))
sum_books: Unit = ()
//计算出现次数,value中,前面是总数,后面是天数,如(11,2),表示2天卖出11本
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).foreach(println)
(book1,(11,2))
(book3,(8,1))
(book2,(22,2))
average_books: Unit = ()//平均值统计
scala> val average_books = books.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1 + y._1 , x._2 + y._2)).mapValues(x => x._1 / x._2).foreach(println)
(book1,5)
(book3,8)
(book2,11)
average_books: Unit = ()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/23067.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

pdf文档加水印怎么弄?用这款软件很方便

在工作中&#xff0c;我们经常需要将PDF文件发送给他人&#xff0c;但无法保证文件内容不被窃取&#xff0c;因此需要添加水印来保证文件的安全性。如果你不知道如何给PDF文件添加水印&#xff0c;以下两款软件可以帮助你轻松实现&#xff0c;一起来看看吧&#xff01; 方法一&…

ELK搭建

ELK介绍&#xff1a; ELK是一组开源工具的缩写&#xff0c;它由Elasticsearch、Logstash和Kibana三个组件组成&#xff0c;用于处理、分析和可视化大量日志数据。 入门级ELK搭建&#xff08;无Docker环境&#xff09; 安装前准备 1.获取安装包 https://artifacts.elastic…

C语言进阶之通讯录的实现(静态版和动态版)以及动态内存管理

通讯录的实现及动态内存管理 1.通讯录实现要求2.静态版通讯录实现2.1 contact.h文件实现2.2 contact.c文件实现2.3 main.c文件实现2.4 静态版通讯录全部文件代码 3.动态内存管理3.1 为什么存在动态内存分配3.2 动态内存函数的介绍3.3 常见的动态内存错误3.4 C/C程序的内存开辟3…

微服务系列文章之 nginx负载均衡

nginx负载均衡 负载均衡建立在现有网络结构之上&#xff0c;提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽&#xff0c;增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。 随着网站的发展&#xff0c;服务器压力越来越大&#xff0c;我们可能首先会将数…

Wi-Fi 相关概念

Wi-Fi 相关概念 802.11 Wi-Fi 标准及其含义频宽 和 带宽 的概念20MHz与40MHz的区别2.4G 频段2.4G的频道编号和中心频率 5G WiFi 频段中国开放的的5G WiFi频段&#xff1a;5G的频道编号和中心频率&#xff1a; 802.11历史进程一、802.11重要发展二、802.11协议族 参考 802.11 Wi…

2023下半年软考高级系统架构设计师怎么报名?

软考高级系统架构设计师报名时间&#xff1a; 广西2023下半年软考高级系统架构设计师报名时间&#xff1a;8月15日8:00至8月24日17:00 广东2023下半年软考高级系统架构设计师报名时间&#xff1a;8月16日9:00-8月24日17:00 甘肃2023下半年软考高级系统架构设计师报名时间&am…

AcWing 107. 超快速排序—逆序对

问题链接: AcWing 107. 超快速排序 问题描述 分析 这道题考查的算法不难&#xff0c;就只是利用归并排序来求逆序对的数量&#xff0c;但是主要是如何分析问题&#xff0c;如何能从问题中看出来和逆序对数量有关&#xff0c;现在的题目基本上很少是那种模板算法题了&#xff…

漏洞复现 || TerraMaster TOS exportUser.php 远程命令执行

阅读须知 技术文章仅供参考,此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等(包括但不限于)进行检测或维护参考,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失,均由使用者本人负责。…

大坝安全监测中需要做好检查监测

大坝安全监测是人们了解大坝运行状态和安全状况的有效手段和方法。它的目的主要是了解大坝安全状况及其发展态势&#xff0c;是一个包括由获取各种环境、水文、结构、安全信息到经过识别、计算、判断等步骤&#xff0c;最终给出一个大坝安全 程度的全过程。 此过程包括&#xf…

SpringCloud Alibaba——Nacos服务领域模型

目录 一、Nacos服务领域模型二、Nacos服务领域模型图解 一、Nacos服务领域模型 模型名称解释Namespace实现环境隔离&#xff0c;默认值publicGroup不同的service可以组成一个Group&#xff0c;默认值Default-GroupService服务名称Cluster对指定的微服务虚拟划分&#xff0c;默…

Linux自主学习 - 多线程的创建(#include<pthread.h>)

备注&#xff1a;vscode通过ssh连接虚拟机中的ubuntu&#xff0c;ubuntu-20.04.3-desktop-amd64.iso 函数pthread_create() // pthread.h中的函数pthread_create()extern int pthread_create (pthread_t *__restrict __newthread, // 线程标识符const pthread_attr_t *…

python 面向对象编程

文章目录 前言如何理解面向对象编程在 python 中如何使用面向对象编程定义类创建对象self添加和获取对象属性添加属性类外添加属性类中添加属性 访问属性类外访问属性类中访问属性 魔法方法__ init __() 方法__ str __()方法__ del __() 方法 前言 大家好&#xff0c;前面我们…