SparkSQL学习03-数据读取与存储

文章目录

    • 1 数据的加载
      • 1.1 方式一:spark.read.format
        • 1.1.1读取json数据
        • 1.1.2 读取jdbc数据
      • 1.2 方式二:spark.read.xxx
        • 1.2.1 读取json数据
        • 1.2.2 读取csv数据
        • 1.2.3 读取txt数据
        • 1.2.4 读取parquet数据
        • 1.2.5 读取orc数据
        • 1.2.6 读取jdbc数据
    • 2 数据的保存
      • 2.1 方式一:spark.write.format
        • 2.1.1 读取orc数据
      • 2.2 方式二:spark.write.xxx
        • 2.2.1 写入到jdbc数据库中

SparkSQL提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API,根据不同的参数读取和保存不用格式的数据。SparkSQL默认读取和保存的文件格式为parquet,parquet是一种能够有效存储嵌套数据的列式存储格式。

1 数据的加载

SparkSQL提供了两种方式可以加载数据

1.1 方式一:spark.read.format

  • spark.read.format读取数据文件格式.load加载数据路径”
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 需要注意:在读取jdbc时需要在format和load之间添加多个option进行相应的JDBC参数设置【url、user、password.tablename】load中不用传递路经空参数即可
  • 数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format
1.1.1读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//使用第一种范式加载数据var frame: DataFrame = session.read.format("json").load("data/people.json")frame.printSchema()/*** 运行结果:root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/*** 运行结果:+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+* */}
}
1.1.2 读取jdbc数据

读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 如果读取的JDBC操作(即读取mysql中的数据)val frame = session.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydb1").option("dbtable","location_info").option("user","root").option("password","123456").load()frame.printSchema()}
}

1.2 方式二:spark.read.xxx

  • 上述的书写方式太过项,所以SparksQL推出了更加便捷的方式spark.read.xxx加载数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在读取jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
1.2.1 读取json数据

json数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()//【推荐使用】第二种方式进行读取操作val frame = session.read.json("data/people.json")frame.printSchema()/**root|-- age: long (nullable = true)|-- height: double (nullable = true)|-- name: string (nullable = true)|-- province: string (nullable = true)*/frame.show()/**+---+------+-------+--------+|age|height|   name|province|+---+------+-------+--------+| 10| 168.8|Michael|    广东|| 30| 168.8|   Andy|    福建|| 19| 169.8| Justin|    浙江|| 32| 188.8| 王启峰|    广东|| 10| 168.8|   John|    河南|| 19| 179.8|   Domu|    浙江|+---+------+-------+--------+ */}}
1.2.2 读取csv数据

csv数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.csv("data/country.csv")frame.printSchema()/**root|-- _c0: string (nullable = true)|-- _c1: string (nullable = true)|-- _c2: string (nullable = true)*/frame.show()/**+---+----------------+---+|_c0|             _c1|_c2|+---+----------------+---+|  1|            中国|  1||  2|      阿尔巴尼亚|ALB||  3|      阿尔及利亚|DZA||  4|          阿富汗|AFG||  5|          阿根廷|ARG||  6|阿拉伯联合酋长国|ARE||  7|          阿鲁巴|ABW||  8|            阿曼|OMN||  9|        阿塞拜疆|AZE|| 10|        阿森松岛|ASC|| 11|            埃及|EGY|| 12|      埃塞俄比亚|ETH|| 13|          爱尔兰|IRL|| 14|        爱沙尼亚|EST|| 15|          安道尔|AND|| 16|          安哥拉|AGO|| 17|          安圭拉|AIA|| 18|安提瓜岛和巴布达|ATG|| 19|        澳大利亚|AUS|| 20|          奥地利|AUT|+---+----------------+---+*/}}
1.2.3 读取txt数据

txt数据:
在这里插入图片描述
读取代码:

package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.text("data/dailykey.txt")frame.printSchema()/**root|-- value: string (nullable = true)* */frame.show()/**+--------------------+|               value|+--------------------+|2018-11-13\ttom\t...||2018-11-13\ttom\t...||2018-11-13\tjohn\...||2018-11-13\tlucy\...||2018-11-13\tlucy\...||2018-11-13\tjohn\...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-13\tricha...||2018-11-14\ttom\t...||2018-11-14\ttom\t...||2018-11-14\ttom\t...|+--------------------+* */}}
1.2.4 读取parquet数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.parquet("data/users.parquet")frame.printSchema()/**root|-- name: string (nullable = true)|-- favorite_color: string (nullable = true)|-- favorite_numbers: array (nullable = true)|    |-- element: integer (containsNull = true)*/frame.show()/*+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|          null|  [3, 9, 15, 20]||   Ben|           red|              []|+------+--------------+----------------+*/}
}
1.2.5 读取orc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()val frame = session.read.orc("data/student.orc")frame.printSchema()/**root|-- id: string (nullable = true)|-- name: string (nullable = true)|-- age: string (nullable = true)|-- gender: string (nullable = true)|-- course: string (nullable = true)|-- score: string (nullable = true)*/frame.show()/**+---+------+---+------+-------+-----+| id|  name|age|gender| course|score|+---+------+---+------+-------+-----+| 12|  张三| 25|    男|chinese|   50|| 12|  张三| 25|    男|   math|   60|| 12|  张三| 25|    男|english|   70|| 12|  李四| 20|    男|chinese|   50|| 12|  李四| 20|    男|   math|   50|| 12|  李四| 20|    男|english|   50|| 12|  王芳| 19|    女|chinese|   70|| 12|  王芳| 19|    女|   math|   70|| 12|  王芳| 19|    女|english|   70|| 13|张大三| 25|    男|chinese|   60|| 13|张大三| 25|    男|   math|   60|| 13|张大三| 25|    男|english|   70|| 13|李大四| 20|    男|chinese|   50|| 13|李大四| 20|    男|   math|   60|| 13|李大四| 20|    男|english|   50|| 13|王小芳| 19|    女|chinese|   70|| 13|王小芳| 19|    女|   math|   80|| 13|王小芳| 19|    女|english|   70|+---+------+---+------+-------+-----+*/}
}
1.2.6 读取jdbc数据
package _02SparkSQL
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}object _06SparkReadData {def main(args: Array[String]): Unit = {val session = SparkSession.builder().appName("SparkReadData").master("local[*]").getOrCreate()// 读取jdbc文件val properties = new Properties()properties.put("user","root")properties.put("password","123456")val frame = session.read.jdbc("jdbc:mysql://localhost:3306/mydb1","location-info",properties)frame.printSchema()frame.show()}
}

2 数据的保存

SparkSQL提供了两种方式可以保存数据

2.1 方式一:spark.write.format

  • spark.write.format(“保存数据格式”).mode(“存储格式”).save(“存储数据路径”)
  • 数据文件格式包括csv、jdbc、json、orc、parquet和textFile。
  • 保存数据可以使用SaveMode,用来指明如何处理数据,使用mode()方法来设置
  • SaveMode是一个枚举类,其中的常量包括:
scala/javaAny LanguageMeaning
SaveMode.ErrorifExists(default)“error”(default)如果文件已经存在,则抛出异常
SaveMode.Append“append”如果文件已经存在,则追加
SaveMode.Overwrite“overwrite”如果文件已经存在,则覆盖
SaveMode.Ignore“ignore”如果文件已经存在,则忽略

需要注意:在读取jdbc时需要在format和save之间添加多个option进行相应的JDBC参数设置【url、user、password、tablename】save中不用传递路经空参数即可,可以不用设置mode

数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format

2.1.1 读取orc数据
package _02SparkSQL
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")//保存到某个路径下,OWstudent为文件夹,不需要文件名frame.write.format("json").mode(SaveMode.Overwrite).save("data/OWstudent")session.stop()}
}

最后结果为:
在这里插入图片描述

2.2 方式二:spark.write.xxx

上述的书写方式太过繁项,所以SparksQL推出了更加便捷的方式:

  • spark.write.xxx(“保存数据路径”)
  • XXX包括csv、jdbc、json、orc、parquet和text
  • 需要注意:在保存jdbc时方法参数为三个分别为【url、tablename、properties对象】,其中properties对象中存储的是【user,password】
  • mode可以选择性设置
2.2.1 写入到jdbc数据库中
package _02SparkSQL
import java.util.Propertiesimport org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object _07SparkWriteData {def main(args: Array[String]): Unit = {//提供SparkSession对象val session = SparkSession.builder().appName("SparkWriteData").master("local").getOrCreate()//先读取数据var frame: DataFrame = session.read.orc("data/student.orc")val properties = new Properties()properties.put("user","root")properties.put("password","123456")frame.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/mydb1","student",properties)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/487240.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机网络面经-HTTP的8种请求方式

简单介绍 HTTP是超文本传输协议,其定义了客户端与服务器端之间文本传输的规范。HTTP默认使用80端口,这个端口指的是服务端的端口,而客户端使用的端口是动态分配的。当我们没有指定端口访问时,浏览器会默认帮我们添加80端口。我们…

29. 【Linux教程】Linux 用户介绍

本小节介绍 Linux 用户的基础知识,了解 Linux 系统中有哪些用户,如何查看当前 Linux 系统中有哪些用户,每一个 Linux 用户的权限取决于这些账号登录时获取到的权限。 1. Linux 用户类型 Linux 系统是一个多用户多任务的操作系统,…

Lua速成(2)

一、流程控制 Lua 编程语言流程控制语句通过程序设定一个或多个条件语句来设定。在条件为 true 时执行指定程序代码,在条件为 false 时执行其他指定代码。 控制结构的条件表达式结果可以是任何值,Lua认为false和nil为假,true和非nil为真。 …

python 基础知识点(蓝桥杯python科目个人复习计划49)

今日复习内容:做复习题 例题1:希尔排序 题目描述: 希尔排序是直接插入排序算法的一种更高效的改进版本,但它是非稳定排序算法。希尔排序是基于插入排序的以下两点性质而提出的改进方法之一: 1.插入排序在对几乎已经…

CVE-2023-44313 Apache ServiceComb Service-Center SSRF 漏洞研究

本次项目基于go语言(本人不精通),虽不是java web框架了 ,但搭建web服务的框架一些思想理念却是通用的,我们由此可以得到一些蛛丝马迹....... 目录 漏洞简介 漏洞分析 漏洞复现 漏洞简介 Apache ServiceComb Servi…

10:部署Dashboard|部署Prometheus|HPA集群

部署Dashboard|部署Prometheus|HPA集群 Dashboard部署Dashboard上传镜像到私有仓库安装服务发布服务创建管理用户查看登录的Token信息 Prometheus步骤一:导入所有后续需要的镜像到私有镜像仓库(在master主机操作操作)步…

BERT学习笔记

论文:《BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding》,2019 代码:[tensorflow],[pytorch] 来源:李沐精度BERT 0、摘要 与之前模型的区别: GPT考虑的是一个单向…

Sora技术详解及影响分析!

Datawhale干货 作者:李孝杰,清华大学,Datawhale成员 从openai sora[1]的技术报告首段可以看出sora的野心远远不止视频生成,他们的目标是通过视频数据来学习一个世界模型或者世界模拟器,这才是真正令人兴奋和激动的部分…

(done) 什么是特征值和特征向量?如何求特征值的特征向量 ?如何判断一个矩阵能否相似对角化?

什么是齐次方程? https://blog.csdn.net/shimly123456/article/details/136198159 行列式和是否有解的关系? https://blog.csdn.net/shimly123456/article/details/136198215 特征值和特征向量 参考视频:https://www.bilibili.com/video/BV…

LeetCode206: 反转链表.

题目描述 给你单链表的头节点 head ,请你反转链表,并返回反转后的链表。 示例 解题方法 假设链表为 1→2→3→∅,我们想要把它改成∅←1←2←3。在遍历链表时,将当前节点的 next指针改为指向前一个节点。由于节点没有引用其前一…

npm install 失败,需要node 切换到 对应版本号

npm install 失败 原本node 的版本号是16.9,就会报以上错误 node版本问题了,我切到这个版本,报同样的错。降一下node(14.18)版本就好了 具体的方法:(需要在项目根目录下切换) 1. …

Linux虚拟机配置静态ip教程

文章目录 为什么要配置静态ip?一、找到网络配置文件二、修改网络配置文件三、重启网络服务或指定的网卡 为什么要配置静态ip? 稳定性和可靠性 。静态IP地址是固定的,不会随着时间或网络连接的变化而改变。这意味着其他设备可以始终准确地找到…