2023_Spark_实验十四:SparkSQL入门操作

1、将emp.csv、dept.csv文件上传到分布式环境,再用 

hdfs  dfs -put dept.csv /input/

hdfs  dfs -put emp.csv /input/

将本地文件put到hdfs文件系统的input目录下

2、或者调用本地文件也可以。区别:sc.textFile("file:///D:\\temp\\emp.csv")


import org.apache.spark.sql.SparkSessionimport org.apache.spark.sql.types._import spark.implicits._case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val lines =sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))val allEmpDF = allEmp.toDFallEmpDF.show

  • StructType 是个case class,一般用于构建schema.

  • 因为是case class,所以使用的时候可以不用new关键字

构造函数

  • 可以传入Seq,List,Array,都是可以的~

  • 还可以用无参的构造器,因为它有一个无参的构造器.

例子

private val schema: StructType = StructType(List(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))

也可以是

private val schema: StructType = StructType(Array(StructField("name", DataTypes.StringType),StructField("age", DataTypes.IntegerType)))

  • 还可以调用无参构造器,这么写

private val schema = (new StructType).add(StructField("name", DataTypes.StringType)).add(StructField("age", DataTypes.IntegerType))

  • 这个无参的构造器,调用了一个有参构造器.this里面是个方法,这个方法的返回值是Array类型,实际上就是无参构造器调用了主构造器

def this() = this(Array.empty[StructField])case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {}

import org.apache.spark.sql.types._val myschema =StructType(List(StructField("empno",DataTypes.IntegerType),StructField("ename",DataTypes.StringType),StructField("job",DataTypes.StringType),StructField("mgr",DataTypes.StringType),StructField("hiredate",DataTypes.StringType),StructField("sal",DataTypes.IntegerType),StructField("comm",DataTypes.StringType),StructField("deptno",DataTypes.IntegerType)))val empcsvRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))import org.apache.spark.sql.Rowval rowRDD=empcsvRDD.map(line => Row (line(0).toInt,line(1),line(2),line(3),line(4),line(5).toInt,line(6),line(7).toInt))val df = spark.createDataFrame(rowRDD,myschema)

将people.json文件上传到分布式环境

hdfs  dfs -put people.json /inputhdfs  dfs -put emp.json /input

//读json文件

val df = spark.read.json("hdfs://Master:9000/input/emp.json")df.show

df.select ("ename").show

df.select($"ename").show

df.select($"ename",$"sal",$"sal"+100).show

df.filter($"sal">2000).show

df.groupBy($"deptno").count.show

df.createOrReplaceTempView("emp")

spark.sql("select * from emp").show

spark.sql("select * from emp where deptno=10").show

spark.sql("select deptno,sum(sal) from emp group by deptno").show


//1 创建一个普通的 view 和一个全局的 viewdf.createOrReplaceTempView("emp1")df.createGlobalTempView("emp2")//2 在当前会话中执行查询,均可查询出结果spark.sql("select * from emp1").showspark.sql("select * from global_temp.emp2").show//3 开启一个新的会话,执行同样的查询spark.newSession.sql("select * from emp1").show //运行出错spark.newSession.sql("select * from global_temp.emp2").show

//7、创建 Datasets//创建 DataSet,方式一:使用序列//1、定义 case classcase class MyData(a:Int,b:String)//2、生成序列,并创建 DataSetval ds = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS//3、查看结果ds.showds.collect


//创建 DataSet,方式二:使用 JSON 数据//1、定义 case classcase class Person(name: String, gender: String)//2、通过 JSON 数据生成 DataFrameval df = spark.read.json(sc.parallelize("""{"gender": "Male", "name": "Tom"}""":: Nil))//3、将 DataFrame 转成 DataSetdf.as[Person].showdf.as[Person].collect


//创建 DataSet,方式三:使用 HDFS 数据val linesDS = spark.read.text("hdfs://Master:9000/input/word.txt").as[String]val words = linesDS.flatMap(_.split(" ")).filter(_.length > 3)words.showwords.collect


val result = linesDS.flatMap(_.split(" ")).map((_,1)).groupByKey(x => x._1).countresult.showresult.orderBy($"value").show

1、将emp.json文件上传到分布式环境,再用 

hdfs  dfs -put emp.json /input/

将本地文件put到hdfs文件系统的input目录下


//8、Datasets 的操作案例//1.使用 emp.json 生成 DataFrameval empDF = spark.read.json("hdfs://Master:9000/input/emp.json")//查询工资大于 3000 的员工empDF.where($"sal" >= 3000).show//创建 case classcase classEmp(empno:Long,ename:String,job:String,hiredate:String,mgr:String,sal:Long,comm:String,deptno:Long)//生成 DataSets,并查询数据val empDS = empDF.as[Emp]//查询工资大于 3000 的员工empDS.filter(_.sal > 3000).show//查看 10 号部门的员工empDS.filter(_.deptno == 10).show//多表查询//1、创建部门表val deptRDD=sc.textFile("hdfs://Master:9000/input/dept.csv").map(_.split(","))case class Dept(deptno:Int,dname:String,loc:String)val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS//2、创建员工表case classEmp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)val empRDD = sc.textFile("hdfs://Master:9000/input/emp.csv").map(_.split(","))val empDS = empRDD.map(x =>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt)).toDS//3、执行多表查询:等值链接val result = deptDS.join(empDS,"deptno")//另一种写法:注意有三个等号val result = deptDS.joinWith(empDS,deptDS("deptno")===empDS("deptno"))//查看执行计划:result.explain

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/139213.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

药物滥用第二篇介绍

MTD: 美沙酮(Methadone),是一种有机化合物,化学式为C21H27NO,为μ阿片受体激动剂,药效与吗啡类似,具有镇痛作用,并可产生呼吸抑制、缩瞳、镇静等作用。与吗啡比较&#x…

物流监管:智慧仓储数据可视化监控平台

随着市场竞争加剧和市场需求的不断提高,企业亟需更加高效、智能且可靠的仓储物流管理方式,以提升企业的物流效率,减少其输出成本,有效应对市场上的变化和挑战。 图扑自研 HT for Web 产品搭建的 2D 智慧仓储可视化平台&#xff0c…

深入理解Huffman编码:原理、代码示例与应用

目录 ​编辑 介绍 Huffman编码的原理 信息理论背景 频率统计 Huffman树 Huffman编码的代码示例 数据结构 权重选择 Huffman编码生成 完整示例 完整代码 测试截图 Huffman编码的应用 总结 介绍 在这个数字时代,数据的有效压缩和传输变得至关重要。Hu…

一文学会使用WebRTC API

WebRTC(Web Real-Time Communication)是一项开放标准和技术集合,由 W3C 和 IETF 等组织共同推动和维护,旨在通过Web浏览器实现实时通信和媒体流传输。WebRTC于2011年6月1日开源并在Google、Mozilla、Opera支持下被纳入万维网联盟的…

校园智慧党建小程序源码系统 带完整的搭建教程

大家好啊,今天来给大家分享一款校园智慧党建小程序源码系统。一起来看看吧。以下是部分功能代码图: 系统特色功能一览: 智能化管理:采用人工智能、大数据、云计算等技术手段,实现自动化、智能化管理。例如&#xff0c…

Windows:Arduino IDE 开发环境配置【保姆级】

物联网开发学习笔记——目录索引 参考官网:Arduino - Home Arduino是一款简单易学且功能丰富的开源平台,包含硬件部分(各种型号的Arduino开发板)和软件部分(Arduino IDE)以及广大爱好者和专业人员共同搭建和维护的互联…

Python合并多个相交矩形框

Python合并多个相交矩形框 前言前提条件相关介绍实验环境Python合并多个相交矩形框代码实现 前言 由于本人水平有限,难免出现错漏,敬请批评改正。更多精彩内容,可点击进入Python日常小操作专栏、YOLO系列专栏、自然语言处理专栏或我的个人主页…

网页在线打开PDF_网站中在线查看PDF之pdf.js

一、pdf.js简介 PDF.js 是一个使用 HTML5 构建的便携式文档格式查看器。 pdf.js 是社区驱动的,并由 Mozilla 支持。我们的目标是为解析和呈现 PDF 创建一个通用的、基于 Web 标准的平台。 pdf.js 将 PDF 文档转换为 HTML5 Canvas 元素,并使用 JavaScr…

微信小程序引入阿里巴巴iconfont图标并使用

介绍 在小程序里,使用阿里巴巴的图标,如下所示: 使用方式 搜索自己需要的图标,然后将需要用到的图标加入购物车,如下图所示: 去右上角,点击购物车按钮;这里第一次使用,会有三个提…

Triple协议的隐式参数传递过程

前言 Dubbo 框架的 RPC 调用除了可以传递正常的接口参数外,还支持隐式参数传递。 隐式参数的传递依赖 RpcContext 对象,它持有一个 Map 对象,消费者往 Map 里写入数据,客户端在发起 RPC 调用前会构建 RpcInvocation,然…

华为eNSP配置专题-ACL的配置

文章目录 华为eNSP配置专题-ACL的配置1、前置环境1.1、宿主机1.2、eNSP模拟器 2、基本环境搭建2.1、基本终端构成和连接2.2、各终端基本配置2.2.1、PC1和PC2的配置2.2.2、模拟互联网的路由器的配置2.2.3、财务部服务器的配置2.2.4、路由器AR1的配置 2.3、让各网段能够ping通互联…

IDEA如何设置项目包名分级

按上面的勾选即可!