Spark Core

Spark Core

一、Spark RDD

RDD概述

1.RDD基础

在这里插入图片描述

2.RDD源代码描述

在这里插入图片描述

3.RDD特性

在这里插入图片描述

4.Spark宽窄依赖

在这里插入图片描述

RDD创建

在驱动器中创建RDD
在这里插入图片描述

1.parallelize

在这里插入图片描述

读取外部数据集创建RDD

2.textFile

在这里插入图片描述
在这里插入图片描述

RDD操作

在这里插入图片描述

缓存rdd到内存
在这里插入图片描述

1.RDD转化操作

在这里插入图片描述

2.常见的转化操作

在这里插入图片描述

3.RDD行动操作

在这里插入图片描述

4.常见的行动操作

在这里插入图片描述

Spark传递函数

在这里插入图片描述

在这里插入图片描述

RDD惰性求值与持久化(缓存)

1.惰性求值

在这里插入图片描述

2.缓存方法
cache()
persist()

移除缓存

unpersist()
3.持节化级别

在这里插入图片描述

RDD练习

Test-01
scala>val rdd1=sc.parallelize(List(5,6,4,2,9,7,8,1,10))scala>val rdd2=rdd1.map(x=>x*2)scala>val sort=rdd2.sortBy(x=>x,true)scala>val rdd3=sort.filter(_>=10)scala>rdd3.collect()

在这里插入图片描述

Test-02
scala>val rdd1=sc.parallelize(Array("a b c","d e f","h i j"))scala>val rdd2=rdd1.flatMap(_.split(" "))scala>rdd2.collect()

在这里插入图片描述

Test-03

RDD并集、交集和去重操作

scala>val rdd1=sc.parallelize(List(1,2,3,4))scala>val rdd2=sc.parallelize(List(5,6,4,3))scala>val rdd3=rdd1.union(rdd2)scala>rdd3.collect()scala>val rdd4=rdd1.intersection(rdd2)scala>rdd4.collect()scala>rdd3.distinct.collect

在这里插入图片描述

Test-04

RDD求并集并按照Key进行分组

scala>val rdd1=sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))scala>val rdd2=sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))scala>val rdd3=rdd1.join(rdd2)scala>rdd3.collect()scala>val rdd4=rdd1.union(rdd2)scala>rdd4.groupByKey.collect

在这里插入图片描述

Test-05

RDD聚合

scala>val rdd1=sc.parallelize(List(1,2,3,4,5))scala>val rdd2=rdd1.reduce(_+_)scala>rdd2

在这里插入图片描述

Test-06

RDD按照Key进行聚合,按照value进行排序

("tom",1)把值1作为key(t=>(t._2,t._1)) ====(1,"tom")最后再反转输出
scala>val rdd1=sc.parallelize(List(("tom",1),("kitty",2),("shuke",1)))scala>val rdd2=sc.parallelize(List(("jerry",2),("tom",3),("shuke",2),("kitty",5)))scala>val rdd3=rdd1.union(rdd2)scala>val rdd4=rdd3.reduceByKey(_+_)scala>rdd4.collect()scala>val rdd5=rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))scala>rdd5.collect()

在这里插入图片描述

二、Spark RDD键值对操作

Pair RDD(键值对)

1.概念

在这里插入图片描述

2.创建方式
A.直接在程序中创建
B.将普通的RDD转化为Pair RDD(K,V)=>Pair RDD
C.数据格式存储为键值对格式的读取(json)
3.创建Pair RDD-三种语言

在这里插入图片描述

file.txtHadoop HDFS MapReduce
Spark Core SQL Streaming

在Scala中使用第一个单词为键创建一个Pair RDD

package com.saddam.spark.SparkCoreimport org.apache.spark.{SparkConf, SparkContext}object PairRDD {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)val lines = sc.textFile("D:\\Spark\\DataSets\\file.txt")val pairs=lines.map(x=>(x.split(" ")(0),x))pairs.foreach(p=>{println(p._1+":"+p._2)})/*Hadoop:Hadoop HDFS MapReduceSpark:Spark Core SQL Streaming*/sc.stop()}
}

在Java中使用第一个单词为键创建一个Pair RDD

在这里插入图片描述

在Python中使用第一个单词为键创建一个Pair RDD

在这里插入图片描述

4.Pair RDD转化操作

在这里插入图片描述

元素过滤

对第二个元素进行筛选

a.读取文件生成rdd
b.将rdd转化为pair rdd
c.使用filter进行过滤

在这里插入图片描述

聚合操作

在这里插入图片描述

package com.saddam.spark.SparkCoreimport org.apache.spark.{SparkConf, SparkContext}object PairRDD_聚合操作 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)val rdd=sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))val rdd1=rdd.mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))rdd1.foreach(x=>{println(x._1+":"+x._2._1/x._2._2)})}}panda:0
pirate:3
pink:3
数据分组

在这里插入图片描述

package com.saddam.spark.SparkCoreimport org.apache.spark.{SparkConf, SparkContext}object PairRDD_数据分组 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)val rdd=sc.parallelize(List(("panda",0),("pink",3),("pirate",3),("panda",1),("pink",4)))val rdd1=rdd.groupByKey()//持久化val rdd2 = rdd1.persist()//    println(rdd2.first()._1+":"+rdd2.first()._2)rdd2.foreach(x=>{println(x._1+":"+x._2)})}}pirate:CompactBuffer(3)
panda:CompactBuffer(0, 1)
pink:CompactBuffer(3, 4)
连接操作

在这里插入图片描述

package com.saddam.spark.SparkCoreimport org.apache.spark.{SparkConf, SparkContext}object PairRDD_连接操作 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)val rdd1=sc.parallelize(List(("frank",30),("bob",9),("silly",3)))val rdd2=sc.parallelize(List(("frank",88),("bob",12),("marry",22),("frank",21),("bob",22)))//内连接val rdd3=rdd1.join(rdd2)rdd3.foreach(x=>{println(x._1+":"+x._2)})/*
frank:(30,88)
bob:(9,12)
frank:(30,21)
bob:(9,22)*///左外连接val rdd4=rdd1.leftOuterJoin(rdd2)rdd4.foreach(x=>{println(x._1+":"+x._2)})/*
silly:(3,None)
frank:(30,Some(88))
frank:(30,Some(21))
bob:(9,Some(12))
bob:(9,Some(22))*///右外连接val rdd5=rdd1.rightOuterJoin(rdd2)rdd5.foreach(x=>{println(x._1+":"+x._2)})/*
frank:(Some(30),88)
frank:(Some(30),21)
marry:(None,22)
bob:(Some(9),12)
bob:(Some(9),22)*///全连接val rdd6=rdd1.fullOuterJoin(rdd2)rdd6.foreach(x=>{println(x._1+":"+x._2)})/*
frank:(Some(30),Some(88))
frank:(Some(30),Some(21))
silly:(Some(3),None)
marry:(None,Some(22))
bob:(Some(9),Some(12))
bob:(Some(9),Some(22))*/}
}
数据排序

在这里插入图片描述

package com.saddam.spark.SparkCoreimport org.apache.spark.{SparkConf, SparkContext}object PairRDD_数据排序 {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)var rdd=sc.parallelize(List(("frank",30),("bob",9),("silly",3)))//排序//按照字典序val rdd2=rdd.sortByKey()rdd2.collect().foreach(println)/*
(bob,9)
(frank,30)
(silly,3)*///按照值排序val rdd3=rdd.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)) //sortByKey(false)-降序rdd3.collect().foreach(println)
/*
(silly,3)
(bob,9)
(frank,30)*/}
}
5.Pair RDD行动操作

在这里插入图片描述

6.数据分区
数据分区原因

在这里插入图片描述

数据分区操作

在这里插入图片描述

示例:PageRank算法

计算网页价值
在这里插入图片描述

package com.saddam.spark.SparkCoreimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}object PageRank {def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair rdd")val sc=new SparkContext(conf)val links=sc.parallelize(List(("A",List("B","C")),("B",List("A","C")),("C",List("A","B","D")),("D",List("C")))).partitionBy(new HashPartitioner(10)).persist()var ranks = links.mapValues(v => 1.0)for (i<-0 until 10){val contributions=links.join(ranks).flatMap{case (pageId,(links,rank))=>links.map(dest=>(dest,rank/links.size))}ranks=contributions.reduceByKey((x,y)=>x+y).mapValues(v=>0.15+0.85*v)}ranks.sortByKey().collect().foreach(println)ranks.saveAsTextFile("D:\\Spark\\OutPut\\PageRank")}
/*
(A,0.9850243302878132)
(B,0.9850243302878132)
(C,1.4621033282930214)
(D,0.5678480111313515)*/
}
7.宽窄依赖
概念

在这里插入图片描述

窄依赖优点

在这里插入图片描述

宽窄依赖的使用

在这里插入图片描述

三、Spark数据读取与保存

Spark数据源

在这里插入图片描述

概述及分类

在这里插入图片描述

文件格式

在这里插入图片描述

文件系统

在这里插入图片描述

Spark SQL中的结构化数据

在这里插入图片描述

Spark访问数据库系统

1.MySQL数据库连接

在这里插入图片描述

1.代码实战
package com.saddam.spark.SparkCoreimport java.sql.{DriverManager, ResultSet}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.JdbcRDD/*** Spark访问MySQL数据库*/
object SparkConnectMySQL {//第一步:定义函数:创建一个连接连接数据库def createConnection()={Class.forName("com.mysql.jdbc.Driver").newInstance()DriverManager.getConnection("jdbc:mysql://localhost:3306/saddam","root","324419")}//第二步:定义一个函数:映射def extractValues(r:ResultSet)={(r.getInt(1),r.getString(2))}def main(args: Array[String]): Unit = {val conf=new SparkConf().setMaster("local[2]").setAppName("pair")val sc=new SparkContext(conf)//第三步:创建RDD读取数据val data=new JdbcRDD(sc,createConnection,"select * from users where ?<=id and id<=?",lowerBound = 1,upperBound = 3,numPartitions = 2,mapRow = extractValues)data.foreach(println)sc.stop()}}(1,admin)
(1,zhangsan)
(1,lisi)
2.问题发现与解决
javax.net.ssl.SSLException MESSAGE: 
closing inbound before receiving peer's close_notify解决:useSSL=false
jdbc:mysql://localhost:3306/saddam?useSSL=false
2.Cassandra连接

开源的NoSQL数据库

在这里插入图片描述

3.HBase连接

在这里插入图片描述

四、Spark编程进阶

Spark的高级特性累加器广播变量基于分区的操作与外部程序间的广播数值RDD操作

Spark的高级特性

在这里插入图片描述

累加器

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/520290.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HTTP协议(请求方式,响应方式,请求行、头、体,状态码)是热点面试题【详解】

目录 1. HTTP简介 1.介绍 2.浏览器抓包 3.特点 2. HTTP请求 1.HTTP请求的格式 2.HTTP请求方式 3.GET方式的请求示例 请求行 请求头 请求体 4.POST方式的请求示例 请求行 请求头 请求体 GET和POST的区别 5.HTTP响应 1.HTTP响应的格式 2 常见响应头 3 响应…

企业财务分析该怎么做?重点分析哪些财务指标?

在企业经营管理的过程中&#xff0c;财务分析是评估当前企业或特定部门财务状况和绩效的过程&#xff0c;这一过程通常涉及对财务报表&#xff08;如资产负债表、利润表和现金流量表&#xff09;进行定量和定性的评估&#xff0c;以便为盈利能力、偿债能力、现金流动性和资金稳…

【计算机系统】2.进程管理

【计算机系统】2.进程管理 这个章节十分的重要&#xff0c;作业也要好好做&#xff0c;因为我学的是后端&#xff0c;学计算机进程的处理对于搞并发来说十分有用。 提出问题 6、试从动态性、并发性和独立性上比较进程和程序。19、为什么要在OS中引入线程?A.请用信号量解决以下…

如何搭建财务数据运营体系:基于财务五力模型的分析

在当今复杂多变的商业环境中,财务数据作为企业决策的重要参考依据,其运营体系的搭建显得尤为关键。一个健全、高效的财务数据运营体系不仅能够为企业提供准确的财务数据支持,还能帮助企业在激烈的市场竞争中保持领先地位。基于财务五力模型的分析,我们可以从收益力、安定力…

第三方软件测试报告有效期是多久?专业软件测试报告获取

第三方软件测试报告是在软件开发过程中&#xff0c;由独立的第三方机构对软件进行全面测试和评估后发布的报告。这些第三方机构通常是与软件开发商和用户无关的专业技术机构&#xff0c;具备丰富的测试经验和专业知识。    第三方测试报告具有以下几个好处&#xff1a;   …

c++ 11 新特性 不同数据类型之间转换函数之const_cast

一.不同数据类型之间转换函数const_cast介绍 const_cast是C11中引入的一种类型转换操作符&#xff0c;用于修改类型的const或volatile属性。const_cast的主要用途是移除对象的常量性&#xff0c;它是唯一具有此能力的C风格的转型操作符。在C11中&#xff0c;const_cast可以完成…

YoloV8改进策略:Block改进|自研Block,涨点超猛|代码详解|附结构图

涨点效果 涨点效果:在我自己的数据集上,mAP50 由0.986涨到了0.993,mAP50-95由0.737涨到0.757,涨点明显! 参考模型 参考的Block,如下图: 我对Block做了修改,修改后的结构图如下: 代码详解 from timm.models.layers import DropPathfrom torch import Tensor def …

黑马java-JavaWeb-MySQL基本操作

1.JavaWeb&#xff1a; 用java技术来解决相关web互联网领域的技术栈 2.数据库&#xff1a; 存储数据的仓库&#xff0c;数据是有组织的进行存储 英文&#xff1a;DataBase&#xff0c;简称DB 3.数据库管理系统&#xff1a; 管理数据库的大型软件 英文&#xff1a;DataBase Mana…

链表|707.设计链表

力扣题目链接 typedef struct MyLinkedList {int val;struct MyLinkedList* next; }MyLinkedList;/** Initialize your data structure here. */MyLinkedList* myLinkedListCreate() {//这个题必须用虚拟头指针,参数都是一级指针,头节点确定后没法改指向了!!!MyLinkedList* he…

力扣大厂热门面试算法题 - 矩阵

解数独&#xff0c;单词搜索&#xff0c;被围绕的区域。每题做详细思路梳理&#xff0c;配套Python&Java双语代码&#xff0c; 2024.03.07 可通过leetcode所有测试用例。 目录 37. 解数独 解题思路 完整代码 Python Java 79. 单词搜索 解题思路 完整代码 Python…

.Net6使用JWT认证和授权

文章目录 目的实现案例一.项目所需包&#xff1a;二.配置项目 appsettings.json 文件&#xff1a;三.创建Model文件夹&#xff0c;添加AppConfig类和UserRole类1.AppConfig类获取appsettings.json文件中的值2.UserRole类用于区分用户信息和权限 四.主体代码案例&#xff1a;1.L…

软考66-上午题-【面向对象技术】-小结+杂题

一、杂题 真题1&#xff1a; 真题2&#xff1a; 真题4&#xff1a; 真题5&#xff1a; 真题6&#xff1a; 二、面向对象设计-总结 2-1、考题分析 选择题&#xff1a;11道&#xff08;11分&#xff09; 综合分析题&#xff1a;2道&#xff08;30分&#xff09; java程序设计…