学习进度笔记⑧

news/2025/1/14 19:47:51/文章来源:https://www.cnblogs.com/binglinll/p/18671460

Spark SQL基本操作

将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

 创建DataFrame

(1) 查询所有数据;

(2) 查询所有数据,并去除重复的数据;

(3) 查询所有数据,打印时去除 id 字段;

(4) 筛选出 age>30 的记录;

(5) 将数据按 age 分组;

(6) 将数据按 name 升序排列;

(7) 取出前 3 行数据;

(8) 查询所有记录的 name 列,并为其取别名为 username;

(9) 查询年龄 age 的平均值;

(10) 查询年龄 age 的最小值。

2、编程实现将RDD转换为DataFrame

源文件内容如下(包含 id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。

假设当前目录为/usr/local/spark/mycode/rddtodf,在当前目录下新建一个目录 mkdir -p src/main/scala ,然后在目录 /usr/local/spark/mycode/rddtodf/src/main/scala 下 新 建 一 个 rddtodf.scala,复制下面代码;(下列两种方式任选其一)

方法一:利用反射来推断包含特定类型对象的 RDD 的 schema,适用对已知数据结构的 RDD 转换;

复制代码
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._
object RDDtoDF {def main(args: Array[String]) {case class Employee(id:Long,name: String, age: Long)val employeeDF =
spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(at
tributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF()employeeDF.createOrReplaceTempView("employee")val employeeRDD = spark.sql("select id,name,age from employee")employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()}
}
复制代码

方法二:使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。

复制代码
import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoder import org.apache.spark.sql.Row object RDDtoDF {def main(args: Array[String]) {val employeeRDD =spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")val schemaString = "id name age"val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)val rowRDD = employeeRDD.map(_.split(",")).map(attributes => Row(attributes(0).trim, attributes(1), attributes(2).trim))val employeeDF = spark.createDataFrame(rowRDD, schema)employeeDF.createOrReplaceTempView("employee")val results = spark.sql("SELECT id,name,age FROM employee")results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()} }
复制代码

在目录/usr/local/spark/mycode/rddtodf 目录下新建 simple.sbt,复制下面代码:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core" % "2.1.0"

在目录/usr/local/spark/mycode/rddtodf 下执行下面命令打包程序

 最后在目录/usr/local/spark/mycode/rddtodf 下执行下面命令提交程序

在终端即可看到输出结果。

3、编程实现利用DataFrame读写Mysql的数据

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的两行数据。

表 6-2 employee 表原有数据

id name gender Age

1  Alice    F         22

2  John    M        25

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

表 6-3 employee 表新增数据

id name gender age

3  Mary     F       26

4  Tom      M      23 

假设当前目录为/usr/local/spark/mycode/testmysql,在当前目录下新建一个目录 mkdir -p src/main/scala , 然 后 在 目 录 /usr/local/spark/mycode/testmysql/src/main/scala 下 新 建 一 个 testmysql.scala,复制下面代码;

复制代码
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
object TestMySQL {def main(args: Array[String]) {val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M23")).map(_.split(" "))val schema = StructType(List(StructField("id", IntegerType,true),StructField("name", StringType, true),StructField("gender", StringType,
true),StructField("age", IntegerType, true)))val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))val employeeDF = spark.createDataFrame(rowRDD, schema)val prop = new Properties()prop.put("user", "root")prop.put("password", "hadoop")prop.put("driver","com.mysql.jdbc.Driver")employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest",sparktest.employee", prop)val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").optio
n("dbtable","employee").option("user","root").option("password", "hadoop").load()jdbcDF.agg("age" -> "max", "age" -> "sum")}
}
复制代码

在目录/usr/local/spark/mycode/testmysql 目录下新建 simple.sbt,复制下面代码:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies += "org.apache.spark" % "spark-core" % "2.1.0"

在目录/usr/local/spark/mycode/testmysql 下执行下面命令打包程序

 最后在目录/usr/local/spark/mycode/testmysql 下执行下面命令提交程序

在终端即可看到输出结果。

遇到的问题

解决方法

添加如下代码:

val spark = org.apache.spark.sql.SparkSession.builder
.master("local")
.appName("Spark CSV Reader")
.getOrCreate;

import spark.implicits._

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/869218.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于大数据分析的智能交通灯管理系统

在智慧交通领域,交通灯管理系统是城市交通流控制的核心。随着大数据技术的发展,基于大数据分析的智能交通灯管理系统正逐渐成为改善城市交通状况、提升道路使用效率的关键技术。今天,我们将深入探讨这一系统的建设内容,共同展望智慧交通的未来。系统概述基于大数据分析的智…

Shell程序设计语言

Shell程序设计语言 一、认识Shell 1.1 编程语言的种类 # 机器语言:站在计算机(奴隶)的角度,说计算机能听懂的语言,那就是直接用二进制编程,直接操作硬件;优点:执行效率最高缺点:1、二进制指令难以记忆,开发时极容易出错2、开发程序的复杂度高:即便是完成一个简单的功能…

logstash输出到loki

运行logstash # logstash version: 3 services:logstash:image: docker.elastic.co/logstash/logstash:8.12.0container_name: logstash_serverrestart: alwaysports:- 8065:8065environment:- LS_JAVA_OPTS=-Xmx1024m -Xms1024mvolumes:- ./config/logstash.conf:/etc/logstas…

【Electron 应用安全测试基础】Electron 框架介绍

免责声明 ⽂中所涉及的技术、思路和⼯具仅供以安全为⽬的的学习交流使⽤,任何⼈不得将其⽤于⾮法⽤途以及盈利等⽬的,否则后果⾃⾏承担。所有渗透都需获取授权!一、引言 跨平台桌面应用开发的演进带来了一系列独特的挑战,主要体现在如何确保在 Windows、macOS 和基于 Linux…

07jdk7u21原生利用链

JDK7u21 反序列化的关键在于找到可以动态方法执行的代码:例如CC链中的Transformer,CB链中的PropertyUtils#getPropertyJDK7u21中动态方法执行的点,AnnotationInvocationHandler#equalsImpl中的hisValue = memberMethod.invoke(o)。 private Boolean equalsImpl(Object o) {i…

英语语法(介词和连词)

认识介词 例子: 例子: 识别介词 时间介词 at 精确的时间 by 不缺定的时间 for 持续的时间 in 固定的时间 on 某一天的时间 since 开始时间 until 直到 空间介词 at 指一个点 by 近的意思 from 从别处到这里 in 一个封闭的区域 off 离开 on 开启 out 方向 远离…

一文读懂如何创建食品加工和包装 SOP

在食品行业中,确保产品的安全性和质量是至关重要的。为了实现这一目标,建立一套详尽且高效的标准操作程序(SOP)对于食品加工与包装环节来说不可或缺。本文将引导您深入理解如何高效创建食品加工与包装SOP,并巧妙融入helplook工具,以提升流程管理的精确性和效率。一、明确…

高性能、零内存分配的Go日志库--Zerolog

简介 Zerolog 是一个高性能、零内存分配的 Go 日志库。 它为不需要垃圾回收的延迟敏感型应用程序提供结构化日志记录功能。 您可以以完全零分配的方式使用,这样在初始化记录器对象后,堆上不会再分配其他对象,从而防止触发垃圾回收。 Zerolog 包提供了一个专用于 JSON 输出的…

Jar Analyzer:JAR包分析工具

工具介绍 一个JAR包分析工具,批量分析,SCA漏洞分析,方法调用关系搜索,字符串搜索,Spring组件分析,信息泄露检查,CFG程序分析,JVM栈帧分析,进阶表达式搜索,字节码指令级的动态调试分析,反编译JAR包一键导出,一键提取序列化数据恶意代码,一键分析BCEL字节码。 Jar A…

第二十次作业

1、对bluecms进行代码审计,分析复现文件上传、ssti模板注入、文件删除等漏洞 文件上传:ssti模板注入: bluecms采⽤了smarty模板引擎可以在模板中插⼊恶意的代码,从⽽执⾏任意命令。此处可知smarty使⽤的界定符为{#和#},参考smarty官⽅⽂档可知,可以使⽤{#php#}code{#/php…

【JAVA开发】企业Java开发:七款提升开发者工作效率的Java分析工具

一、引言 编写正确的代码至关重要,但同样重要的是理解代码在实际环境中的表现。Java 分析工具在这方面提供了极大的便利,它们可以帮助您深入了解程序的内部运作。虽然进行性能分析可能需要额外的时间投入,但它最终能够为您节省大量调试时间。 Java 分析器提供了关于 CPU 使用…

JS-32 数组方法_shift()/unshift()

shift方法用于删除数组的第一个元素,并返回该元素。注意,该方法会改变原数组 var arr=[字符串,zifuchuan,前端]; arr.shift()//字符串 arr//[zifuchuan,前端] shift方法可以遍历并清空一个数组 var list=[1,2,3,4,5,6]; var item;while(item=list.shift()){ console.log(ite…