RDD入门——RDD 代码

创建RDD

程序入口 SparkContext

val conf = new SparkConf().setMaster("local[2]").setAppName(spark_context")
val sc: SparkContext = new SparkContext(conf)
  • SparkContext 是 spark-core 的入口组件,是一个 Spark 程序的入口,在 Spark 0.x 版本就已经存在 SparkContext 了,是一个元老级的API

  • 如果把一个 Spark 程序分为前后端,那么服务就是可以运行 Spark 程序的集群,而 Driver 就是 Spark 的前端,

    在 Driver 中, SparkContext 是最主要的组件,也是 Driver 在运行时首先会创建的组件,是 Driver 的核心

  • SparkContext 从提供的 API 来看,主要作用是连接集群,创建 RDD , 累加器,广播变量等

简略的说,RDD 有三种创建方式

  • RDD 可以通过本地集合创建RDD
  • RDD 可以通过外部数据创建RDD
  • RDD 可以通过其它的RDD衍生新的RDD

 

通过本地集合创建RDD

@Test
def rddCreationLocal(): Unit = {val seq = Seq("Hello1", "Hello2", "Hello3") // 里面数据是什么类型val rdd1:RDD[String] = sc.parallelize(seq, 2) // RDD的里面的泛型就是什么类型, 指定为2个分区数sc.parallelize(seq) // parallelize 可以不指定分区数val rdd2: RDD[String] = sc.makeRDD(seq,2)
}

通过外部数据创建RDD

@Test
def rddCreationFiles(): Unit = {sc.textFile("hdfs:///.....")// 1. textFile中传入的是什么//    * 传入的是一个路径,读取路径//    * hdfs://   file://   /.../...(这种方式分为在集群中执行还是在本地中执行,如果在集群中,读的是hdfs,本地读的是文件系统)// 2. textFile是否支持分区?//    * 假如传入的path是 hdfs://....//    * 分区是由 HDFS 中文件的 block 决定的// 3. textFile支持什么平台//    * 支持 aws 和 阿里云
}

通过其它的RDD衍生新的RDD

@Test
def rddCreateFromRDD(): Unit = {val rdd1 = sc.parallelize(Seq(1, 2, 3))// 通过在 rdd 上执行算子操作,会生成新的 rdd// 原地计算// java中,str.substr 返回新的字符串,非原地计算// 通过rdd1.map 的操作 ,和字符串中的方式很像, 字符串可变吗?不可变// RDD 可变吗? 不可变val rdd2: RDD[Int] = rdd1.map(item => item)
}

RDD的算子操作

  • map 算子

    # spark-shell
    sc.parallelize(Seq(1, 2, 3)).map( num => num * 10).collect()# IDEA
    @Test
    def mapTest(): Unit = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq(1, 2, 3))// 2. 执行 map 操作val rdd2 = rdd1.map(item => item * 10)// 3. 得到结果val result:Array[Int] = rdd2.collect()result.foreach(item => println(item))// 关闭scsc.stop()
    }
    
    • 作用
      • 把 RDD 中的数据 一对一的转换为另一种形式
    • 调用
      • def map[U: ClassTag] (f: T ⇒ U) : RDD[U]
    • 参数
      • f → map 算子是 原 RDD → 新 RDD 的过程, 这个函数的参数是原 RDD 的数据, 返回值是经过函数转换的新 RDD 的数据
    • 注意点
      • map 是一对一, 如果函数是 String → Array[String]则新的 RDD 中每条数据就是一个数组
  • FlatMap算子

    # spark-shell
    sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim")).flatMap( line => line.split(" ")).collect()# IDEA
    @Test
    def flatMapTest(): Unit = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))// 2. 执行 flatMap 操作val rdd2 = rdd1.flatMap( line => line.split(" "))// 3. 得到结果val result:Array[String] = rdd2.collect()result.foreach(line => (println(line)))// 关闭scsc.stop()
    }*
    
    • 作用
      • flatMap 算子和 map 算子类似, 但是 FlatMap 是一对多
    • 调用
      • def flatMap[U: ClassTag](f: T ⇒ List[U]): RDD[U]
    • 参数
      • f → 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDD
    • 注意点
      • flatMap 其实是两个操作, 是 map + flatten, 也就是先转换, 后把转换而来的 List 展开
      • flatMap 也是转换,他可以把数组和集合展开,并且flatMap中的函数一般也是集合或者数组
  • ReduceByKey算子

    # spark-shell
    sc.parallelize(Seq(("a",1), ("a", 1), ("b", 1))).reduceByKey( ( cur, agg) => cur + agg).collect()# IDEA
    @Test
    def reduceByKeyTest(): Unit = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))// 2. 处理数据val rdd2 = rdd1.flatMap( item => item.split(" ")).map(item => (item, 1)).reduceByKey( (cur, agg) => cur + agg)// 3. 得到结果val result:Array[(String, Int)] = rdd2.collect()result.foreach(item => (println(item)))// 4. 关闭scsc.stop()
    }
    
    • 作用
      • 首先按照 Key 分组, 接下来把整组的 Value 计算出一个聚合值, 这个操作非常类似于 MapReduce 中的 Reduce
    • 调用
      • def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]
    • 参数
      • func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果
    • 注意点
      • ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple
      • ReduceByKey 是一个需要 Shuffled 的操作
      • 和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少
      • reduceByKey第一步是按照Key进行分组,然后对每一组进行聚合得到结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/328042.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

RocketMQ源码 发送顺序消息源码分析

前言 rocketmq 发送顺序消息和普通消息的主流程区别大部分一致的,区别在于:普通消息发送时,从所有broker的队列集合中 轮询选择一个队列,而顺序队列可以提供用户自定义消息队列选择器,从NameServer 分配的顺序 broker…

CRM市场营销管理功能,如何进行客户细分和数据分析?

CRM管理系统中的营销管理模块,它的锋芒常被销售管理所掩盖,但对于企业的业务来说同样重要。营销部门虽然不像销售人员一样直接面对客户,却是挖掘线索、商机的重要角色。CRM在市场营销领域的关键功能包括:营销漏斗、客户细分、营销…

二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明

处理二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明 sitemapLocation 指明 sitemap.json 的位置;默认为 ‘sitemap.json’ 即在 app.json 同级目录下名字的 sitemap.json 文件 找到app.json这个文件 把这段代码加进去&…

2024PMP考试新考纲-【人员领域】近期典型真题和超详细解析(5)

今天华研荟继续为您分享PMP新考纲下的【人员People领域】近年真题,帮助大家举一反三,一次性通过2024年的PMP考试。 2024年PMP考试新考纲-【人员领域】真题解析21 题:项目经理正在为一个项目工作。该项目由于人员流动,相关方登记册…

STM32入门教程-2023版【3-2】STM32如何使用库函数及几种方法

关注 点赞 不错过精彩内容 大家好,我是硬核王同学,最近在做免费的嵌入式知识分享,帮助对嵌入式感兴趣的同学学习嵌入式、做项目、找工作! 五、库函数的使用方法 (1)第一种 想使用库函数,可以先打开.h文件&…

C语言第三方库Melon开箱即用之词法分析器使用

之前的文章中,笔者介绍了Linux/UNIX C语言库Melon的基本功能及框架使用。 本文将介绍Melon中的词法分析器组件。 Melon的Github仓库为:https://github.com/Water-Melon/Melon 词法分析器在Melon中并不依赖于自身框架,因此可以在不初始化框…

2.3_7 生产者-消费者问题

2.3_7 生产者-消费者问题 系统中有一组生产者进程和一组消费者进程,生产者进程每次生产一个产品放入缓冲区,消费者进程每次从缓冲区中取出一个产品并使用。(注:这里的“产品”理解为某种数据) 生产者、消费者共享一个初始为空、大小为n的缓冲区。 只有缓冲区没满时,生产者才…

PHP在线sqlite转html表格小功能(sqlite2html)

6KB PHP实现在线sqlite转html表格小功能(支持大文件上传,得到一表一文件) 可自定义:上传限制大小;支持后缀格式!下载格式位压缩包,内含一表一个html文件。 作用:程序员实用工具,上传sqlite数据得到html表格数据供本地…

嵌入式培训机构四个月实训课程笔记(完整版)-Linux系统编程第四天-Linux管道(物联技术666)

更多配套资料CSDN地址:点赞+关注,功德无量。更多配套资料,欢迎私信。 物联技术666_嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记-CSDN博客物联技术666擅长嵌入式C语言开发,嵌入式硬件,嵌入式培训笔记,等方面的知识,物联技术666关注机器学习,arm开发,物联网,嵌入式硬件,单片机…

课堂分享 | IT与OT是什么?

长期以来信息技术IT和操作运营技术OT是相互隔离的,随着大数据分析和边缘计算业务的对现场级实时数据的采集需求,IT和OT有了逐渐融合的趋势。IT与OT融合,它赋予工厂的管理者监控运行和过程的能力大为增强,甚至可以预测到可能发生的…

Leetcode2967. 使数组成为等数数组的最小代价

Every day a Leetcode 题目来源:2967. 使数组成为等数数组的最小代价 解法1:贪心 题目中要求将数组变成等数数组(数组中的所有元素都等于一个小于 109 的回文数)。因此,我们需要找到一个小于 109 的回文数&#xf…

Vue新手村(一)

目录 1、Vue简介——Vue的特点 2、Vue的第一个页面 3.Vue的简单使用介绍 3.1、{{ }}的使用 3.2、v-text和v-html 3.2.1、v-text和{{ }}的区别 3.2.2、v-html和v-text的区别 3.3、v-on【事件绑定】 3.3.1、绑定事件的语法 3.3.2、语法简化 3.3.3、传参 3.4、v-show和…