RDD编程初级实践

参考链接

spark入门实战系列--8MLlib spark 实战_mob6454cc68310b的技术博客_51CTO博客icon-default.png?t=N7T8https://blog.51cto.com/u_16099212/7454034

Spark和Hadoop的安装-CSDN博客icon-default.png?t=N7T8https://blog.csdn.net/weixin_64066303/article/details/138021948?spm=1001.2014.3001.5501

1. spark-shell交互式编程

启动spark-shell

cd /usr/local/spark/
./bin/spark-shell

1.1 该系总共有多少学生

注:我将下载的chapter5-data1.txt文件放在“/home/hadoop/下载”目录下。

val lines = sc.textFile("file:///home/hadoop/下载/chapter5-data1.txt")  #读取文件
lines.map(row=>row.split(",")(0)).distinct().count  #每一行作为一个字符串,用’,’分割,取第一个元素,distinct去重,count统计有多少数据项

1.2 该系共开设来多少门课程

lines.map(row=>row.split(",")(1)).distinct().count   #去第二个元素,去重,统计元素数量

1.3 Tom同学的总成绩平均分是多少

lines.filter(row=>row.split(",")(0)=="Tom")    #以','作为分隔符,用filter进行过滤,筛选出第一项是“Tom”的数据项.map(row=>(row.split(",")(0),row.split(",")(2).toInt))    #把第一项和第三项(姓名+成绩)合在一起构成一个数据项.mapValues(x=>(x,1))    #去除value,把x变成(x,1),第一项是原始数据,第二项是数字1.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))    #针对想对的Key(也就是姓名),来进行运行,运算规则是(x.1+y._1),表示求和,也就是对(x,1)分别进行求和.mapValues(x=>(x._1/x._2)).collect()  #求平均值运算,x._1是原始数据的求和,x._2是1的求和,表示数据项的个数

读取的是字符串,所以需要转Int .

1.4 求每名同学的选修的课程门数

lines.map(row=>(row.split(",")(0),1)).reduceByKey((x,y)=>x+y).collect 

首先是将数据变成(姓名,1)的map,然后针对相同key(姓名)的数据进行求和,也就是统计数据项的个数。 

1.5 该系DataBase课程共有多少人选修

lines.filter(row=>row.split(",")(1)=="DataBase").count #直接是筛选第二项(课程)是DataBase的数据,然后进行统计个数

1.6 各门课程的平均分是多少

lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect()

 求平均分的部分和前面是保持一致的,区别就是筛选构成map的时候前面是根据“Tom”来划分,现在是根据第二项的课程来进行划分。

1.7 使用累加器计算共有多少人选了DataBase这门课

val acc=sc.longAccumulator("My Accumulator")    #定义一个累加器
# #筛选第二项是DataBase的数据项,构成一个(DataBase,1)的map,用foreach,对values值来进行累加
lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1)).values.foreach(x=>acc.add(x))
#输出累加值
acc.value

2. 编写独立应用程序实现数据去重

2.1创建相关项目

sudo mkdir -p /example/sparkapp4/src/main/scala
cd /example/sparkapp4/src/main/scala
sudo touch A.txt
sudo vim A.txt
sudo touch B.txt
sudo vim B.txt

sudo vim SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConfobject SimpleApp {def main(args: Array[String]): Unit = {//配置val conf = new SparkConf().setAppName("Simple Application")val sc = new SparkContext(conf)//读取文件A.txtval A = sc.textFile("file:///example/sparkapp4/src/main/scala/A.txt")//读取文件B.txtval B = sc.textFile("file:///example/sparkapp4/src//main/scala/B.txt")//对两个文件进行合并val C = A ++ B//1.用distinct进行去重//2.以空格来进行分割//3.根据key排序val distinct_lines = C.distinct().map(row => (row.split("    ")(0), row.split("    ")(1))).sortByKey()//将RDD类型的数据转换为数组val result = distinct_lines.collect()//将结果输出到C.txt中val out = new FileWriter("/example/sparkapp4/src/main/scala/C.txt", true)for (item <- result) {out.write(item + "\n")println(item)}out.close()}
}

 2.2创建.sbt文件

cd /example/sparkapp4
sudo touch build.sbt
sudo vim build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.13.13"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"

 2.3打包执行

 出现Exception in thread "main" java.io.FileNotFoundException:/example/sparkapp4/src/main/scala/C.txt (权限不够)

切换到root用户:su root

他这个空格我还是粘贴的,如果代码只有一个空格分割他的结果第二个数据是空的。

sudo /usr/local/sbt/sbt package
su root
spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar

3. 编写独立应用程序实现求平均值问题

3.1创建相关文件

sudo mkdir -p /example/sparkapp5/src/main/scala
cd /example/sparkapp5/src/main/scala
sudo vim Algorithm.txt
sudo vim Database.txt
sudo vim Python.txt

vim ./src/main/scala/SimpleApp.scala
import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConfobject SimpleApp {def main(args: Array[String]): Unit = {//配置val conf = new SparkConf().setAppName("Simple Application")val sc = new SparkContext(conf)//读取文件Algorithm.txtval Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")//读取文件Database.txtval Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")//读取文件Python.txtval Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")//对三个文件进行整合val scoreSum = Algorithm ++ Database ++ Python//以空格切割将名字作为key,(成绩,1)作为valueval student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))//求平均分数val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))//将RDD类型的数据转化为数组val result = student_ave.collect()val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)for (item <- result) {out.write(item + "\n")println(item)}out.close()}
}

2.2创建.sbt文件

如上同

2.3打包执行

如上同

题目要求要保留两位小数,我找的那个没有保留小数,我目前写的这个小数后面不止两位。

写入文件采用的是追加的方式。

 补:

还是解决了,先写简单的程序调试,然后直接替换。

 刚开始想的不对,直接用的是Array,结果不出意外报错了。

object Test {def main(args: Array[String]): Unit = {var a = Array("feng", 12.355353)println(a)println(a(0))println(a(1))println(a(1).formatted("%.2f"))printf("%s %.2f\n", a(0), a(1))}
}

 因为需要格式化输出的是一个Map,不是Array,所以需要修改代码。

[Ljava.lang.Object;@43a25848
feng
12.355353
12.36
feng 12.36
object Test {def main(args: Array[String]): Unit = {var map = Map[String, Double]("feng" -> 12.442424, "xi" -> 13.35262, "ze" -> 23.151425)for (elem <- map) {println(elem)}for ((key, value) <- map) {val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDoubleprintln(s"($key,$roundedValue)")}}
}
(feng,12.442424)
(xi,13.35262)
(ze,23.151425)
(feng,12.44)
(xi,13.35)
(ze,23.15)

 之后就是直接替换原始的代码就行了。

import java.io.FileWriter
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConfobject SimpleApp {def main(args: Array[String]): Unit = {//配置val conf = new SparkConf().setAppName("Simple Application")val sc = new SparkContext(conf)//读取文件Algorithm.txtval Algorithm = sc.textFile("file:///example/sparkapp5/src//main/scala/Algorithm.txt")//读取文件Database.txtval Database = sc.textFile("file:///example/sparkapp5/src//main/scala/Database.txt")//读取文件Python.txtval Python = sc.textFile("file:///example/sparkapp5/src//main/scala/Python.txt")//对三个文件进行整合val scoreSum = Algorithm ++ Database ++ Python//以空格切割将名字作为key,(成绩,1)作为valueval student_grade = scoreSum.map(row => (row.split(" ")(0), (row.split(" ")(1).toInt, 1)))//求平均分数val student_ave = student_grade.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).map(x => (x._1, 1.0 * x._2._1 / x._2._2))//将RDD类型的数据转化为数组val result = student_ave.collect()val out = new FileWriter("/example/sparkapp5/src/main/scala/average.txt", true)/* for (item <- result) {out.write(item + "\n")println(item)}*/for ((key, value) <- result) {val roundedValue = BigDecimal(value).setScale(2, BigDecimal.RoundingMode.HALF_UP).toDoubleout.write(s"($key,$roundedValue)\n")println(s"($key,$roundedValue)")}out.close()}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/652015.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【python】语言学习笔记--用来记录总结

请问以下变量哪些是tuple类型&#xff1a; a ()b (1)c [2]d (3,)e (4,5,6)answer在Python中&#xff0c;元组&#xff08;tuple&#xff09;是由逗号分隔的一组值组成的有序序列&#xff0c;通常用圆括号括起来。让我们逐个检查变量&#xff0c;看哪些是元组类型&#xff…

C#基础之冒泡排序

排序初探 文章目录 冒泡排序1、概念2、冒泡排序的基本原理3、代码实现思考1 随机数冒泡排序思考2 函数实现排序 冒泡排序 1、概念 将一组无序的记录序列调整为有序的记录序列&#xff08;升、降序&#xff09; 2、冒泡排序的基本原理 两两相邻&#xff0c;不停比较&#x…

海外仓精细化管理方法:ABC库存分析,大幅提升仓库有效利用率

ABC分析是海外仓管理的一种比较有效的方法&#xff0c;主要是帮助评估库存产品对仓库的价值量大小。这是一种根据需求、成本和风险数据等因素综合进行的评估&#xff0c;通过评估&#xff0c;仓管员可以更有效的组织和安排仓库产品&#xff0c;提升仓库的有效利用率&#xff0c…

微信小程序:11.本地生活小程序制作

开发工具&#xff1a; 微信开发者工具apifox进行创先Mock 项目初始化 新建小程序项目输入ID选择不使用云开发&#xff0c;js传统模版在project.private.config中setting配置项中配置checkinalidKey&#xff1a;false 梳理项目结构 因为该项目有三个tabbar所以我们要创建三…

交换排序-冒泡排序 快速排序

目录 3.1 冒泡排序 3.2 快速排序 Hoare版本快速排序 挖坑法快速排序 前后指针法快速排序 快速排序优化-三数取中法 快速排序非递归 3.1 冒泡排序 思想&#xff1a;升序情况下&#xff1a;左边大于右边就进行交换&#xff0c;每一次把最大的放在最后一位。 void Swap(int…

微信小程序:6.事件

什么事事件 事件就是渲染层到逻辑层的通讯方式&#xff0c;比如提交表单&#xff0c;按钮点击都可以看作一个事件。 小程序中常用的事件 事件对象属性列表 当事件回调时&#xff0c;会收到一个事件对象event&#xff0c;他详细属性如夏表所示&#xff1a; target和curren…

web(微博发布案例)

示例&#xff1a; 1、检测空白内容 2、发布内容 html: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta …

本地Windows主机,使用pycharm通过wsl的ubuntu来创建django项目

Windows主机在pycharm中通过wsl的ubuntu来创建django项目 需求&#xff1a;在windows主机中创建python项目再转接到linux服务器中运行&#xff0c;有点麻烦。【特别是存放日志文件或其他文件路径时需要修改为linux中的路径】 1&#xff1a;我的是windows主机 2&#xff1a;有…

【uniapp/ucharts】采用 uniapp 框架的 h5 应用使用 ucharts(没有 uni_modules)

这种情况无法直接从 dcloud 平台上一键下载导入&#xff0c;所以应该在官网推荐的 git 仓库去单独下载&#xff1a; https://gitee.com/uCharts/uCharts/tree/master/uni-app/uCharts-%E7%BB%84%E4%BB%B6/qiun-data-charts(%E9%9D%9Euni_modules) 下载的文件是如图所示的路径&…

跳出框架:Facebook的创新策略与社交影响

1. 引言 在数字化时代&#xff0c;社交媒体如同一面镜子&#xff0c;反映出我们社会的多元性和变革。Facebook&#xff0c;作为这面镜子中最明亮的一个&#xff0c;不仅改变了人们的日常生活&#xff0c;更深刻地塑造了社交、文化和经济的面貌。本文将深入探讨Facebook的创新策…

DRF JWT认证进阶

JWT认证进阶 【0】准备工作 &#xff08;1&#xff09;模型准备 模型准备&#xff08;继承django的auth_user表&#xff09; from django.db import models from django.contrib.auth.models import AbstractUserclass UserInfo(AbstractUser):mobile models.CharField(ma…

Grafana系列 | Grafana监控TDengine库数据 |Grafana自定义Dashboard

开始前可以去grafana官网看看dashboard文档 https://grafana.com/docs/grafana/latest/dashboards 本文主要是监控TDengine库数据 目录 一、TDengine介绍二、Grafana监控TDengine数据三、Grafana自定义Dashboard 监控TDengine库数据1、grafana 变量2、添加变量3、配置panel 一…