Spark编程实验三:Spark SQL编程

目录

一、目的与要求

二、实验内容

三、实验步骤

1、Spark SQL基本操作

2、编程实现将RDD转换为DataFrame

3、编程实现利用DataFrame读写MySQL的数据

四、结果分析与实验体会


一、目的与要求

1、通过实验掌握Spark SQL的基本编程方法;
2、熟悉RDD到DataFrame的转化方法;
3、熟悉利用Spark SQL管理来自不同数据源的数据。

二、实验内容

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1)查询所有数据;
(2)查询所有数据,并去除重复的数据;
(3)查询所有数据,打印时去除id字段;
(4)筛选出age>30的记录;
(5)将数据按age分组;
(6)将数据按name升序排列;
(7)取出前3行数据;
(8)查询所有记录的name列,并为其取别名为username;
(9)查询年龄age的平均值;
(10)查询年龄age的最小值。

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

三、实验步骤

1、Spark SQL基本操作

        将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":"Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:

>>> spark=SparkSession.builder.getOrCreate()
>>> df = spark.read.json("file:///home/zhc/mycode/sparksql/employee.json")

(1)查询所有数据;

>>> df.show()

(2)查询所有数据,并去除重复的数据;

>>> df.distinct().show()

(3)查询所有数据,打印时去除id字段;

>>> df.drop("id").show()

(4)筛选出age>30的记录;

>>> df.filter(df.age > 30).show()

(5)将数据按age分组;

>>> df.groupBy("age").count().show()

(6)将数据按name升序排列;

>>> df.sort(df.name.asc()).show()

(7)取出前3行数据;

>>> df.take(3)

(8)查询所有记录的name列,并为其取别名为username;

>>> df.select(df.name.alias("username")).show()

(9)查询年龄age的平均值;

>>> df.agg({"age": "mean"}).show()

(10)查询年龄age的最小值。

>>> df.agg({"age": "min"}).show()

2、编程实现将RDD转换为DataFrame

        源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

首先,在“/home/zhc/mycode/sparksql”目录下创建文件employee.txt

​​​​​​​[root@bigdata sparksql]# vi employee.txt

然后,在该目录下新建一个py文件命名为rddtodf.py,然后写入如下py程序:

[root@bigdata sparksql]# vi rddtodf.py
#/home/zhc/mycode/sparksql/rddtodf.py
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
if __name__ == "__main__":sc = SparkContext("local","Simple App")spark=SparkSession(sc)peopleRDD = spark.sparkContext.textFile("file:home/zhc/mycode/sparksql/employee.txt")rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()rowRDD.createOrReplaceTempView("employee")personsDF = spark.sql("select * from employee")personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)

最后,运行该程序:

[root@bigdata sparksql]# python3 rddtodf.py

3、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,启动mysql服务并进入到mysql数据库中:

[root@bigdata sparksql]# systemctl start mysqld.service
[root@bigdata sparksql]# mysql -u root -p

然后开始接下来的操作。

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表所示的两行数据。

mysql> create database sparktest;
mysql> use sparktest;
mysql> create table employee (id int(4), name char(20), gender char(4), age int(4));
mysql> insert into employee values(1,'Alice','F',22);
mysql> insert into employee values(2,'John','M',25);

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表所示的三行数据到MySQL中,最后打印出age的最大值和age的总和。

首先,在“/home/zhc/mycode/sparksql”目录下面新建一个py程序并命名为mysqltest.py。

[root@bigdata sparksql]# vi mysqltest.py

接着,写入如下py程序: 

#/home/zhc/mycode/sparksql/mysqltest.py
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
#下面设置模式信息
schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
employeeRDD = spark.sparkContext.parallelize(["3 Mary F 26","4 Tom M 23","5 zhanghc M 21"]).map(lambda x:x.split(" "))
#下面创建Row对象,每个Row对象都是rowRDD中的一行
rowRDD = employeeRDD.map(lambda p:Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
#建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
employeeDF = spark.createDataFrame(rowRDD, schema)
#写入数据库
prop = {}
prop['user'] = 'root'
prop['password'] = 'MYsql123!'
prop['driver'] = "com.mysql.jdbc.Driver"
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest?useSSL=false",'employee','append', prop)
employeeDF.collect()
employeeDF.agg({"age": "max"}).show()
employeeDF.agg({"age": "sum"}).show()

然后,直接运行该py程序即可得到结果:

[root@bigdata sparksql]# python3 mysqltest.py

最后,到MySQL Shell中,即可查看employee表中的所有信息。

mysql> select * from employee;

四、结果分析与实验体会

        Spark SQL是Apache Spark中用于处理结构化数据的模块。它提供了一种类似于SQL的编程接口,可以用于查询和分析数据。通过实验掌握了Spark SQL的基本编程方法,SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表,然后使用SQL语句来操作数据。
        在使用Spark SQL之前,需要创建一个SparkSession对象。可以使用SparkSession的read方法加载数据。可以使用DataFrame的createOrReplaceTempView方法将DataFrame注册为一个临时视图。可以使用SparkSession的sql方法执行SQL查询。除了使用SQL查询外,还可以使用DataFrame的API进行数据操作和转换。可以使用DataFrame的write方法将数据写入外部存储。在使用完SparkSession后,应该调用其close方法来关闭SparkSession。
        最后,还掌握了RDD到DataFrame的转化方法,并可以利用Spark SQL管理来自不同数据源的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/298297.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习中用来训练的train.py 探究学习2.0( 数据预处理)

数据预处理 下列代码为train.py中常见的一些数据处理方法 train_transform transforms.Compose([transforms.Resize((224, 224)),transforms.RandomVerticalFlip(),# 随机旋转,-45度到45度之间随机选transforms.RandomRotation(45),# 从中心开始裁剪transforms.C…

旧衣回收小程序搭建有什么优势?

今年以来,旧衣回收行业分外火热,不断有创业者进入到市场中,其中不乏有年轻人,足以可见行业的火爆。 我国是人口大国,每个人闲置的衣物加在一起的数量难以计算,旧衣回收行业具有巨大的发展空间。 此外&…

【FPGA】Verilog 实践:优先级编码器 | Priority encoder

0x00 优先级编码器(Priority encoder) "能将多个二进制输入压缩成更少数目输出的电路或算法的编码器" 优先级编码器是一种编码器,它考虑了两个或更多输入位同时变为 1 但没有收到输入的情况。当输入进来时,优先级编码…

图像ISP处理——畸变校正算法

图像畸变校正算法主要用于矫正图像中因为摄像机镜头畸变而引起的形状和尺寸变化。摄像机镜头畸变主要包括径向畸变和切向畸变。以下是一些常见的图像畸变校正算法: 多项式畸变校正法(Polynomial Distortion Correction): 原理&am…

数据库开发表操作案例的详细解析

2.3.1.4 案例 需求:根据产品原型/需求创建表((设计合理的数据类型、长度、约束) 产品原型及需求如下: 步骤: 阅读产品原型及需求文档,看看里面涉及到哪些字段。 查看需求文档说明,确认各个字段的类型以及字段存储数据…

将Go语言开发的Web程序部署到K8S

搭建K8S基础环境 如果已经有K8S环境的同学可以跳过,如果没有,推荐你看看我的《Ubuntu22加Minikue搭建K8S环境》,课程目录如下: Ubuntu22安装Vscode 下载:https://code.visualstudio.com/Download 安装命令&#…

【Cisco Packet Tracer】综合实践题-校园网仿真

本题的目的: 理论与实践结合:Cisco Packet Tracer是一个网络模拟软件,通过模拟真实的网络环境,可以让学生在实际操作中加深对理论知识的理解和掌握。问题解决能力:综合实验题可以考察学生分析和解决问题的能力。在实验…

新版IDEA中Git的使用(二)

说明:前面介绍了在新版IDEA中Git的基本操作,本文介绍关于分支合并、拉取等操作; 例如,现在有一个项目,分支如下: main:主分支; dev:开发分支; test&#x…

第11章 GUI Page435 降低画面闪烁感

运行效果: 画一个圆的过程中出现 如果把窗口最小化,再恢复,则又恢复成了一个圆 关键代码: 进入wxSmith界面,为ScrolledWindow1添加一个EVT_ERASE_BACKGROUND响应事件 这是一个空处理函数:什么也不做&…

读算法霸权笔记03_操控与恐吓

1. 市场失灵 1.1. 探索市场失灵现象就像寻宝游戏,很有意思 1.2. 建立一种算法来预测这个循环出现差错——多找回的两枚硬币——的可能性并对此下注 1.2.1. 盈利模式消失了,或者市场中的其他人也了解了这个模式,先行者优势消失 1.3. 在很多…

LLM大语言模型(三):使用ChatGLM3-6B的函数调用功能前先学会Python的装饰器

目录 ChatGLM3-6B的函数调用模式示例 本地启动ChatGLM3-6B工具模式 如何在ChatGLM3-6B里新增一个自定义函数呢? get_weather基于Python的装饰器实现 函数注解register_tool 现在我们来自定义一个kuakuawo()函数 ChatGLM3-6B的函数调用模式示例 ChatGLM3-6B目前…

OAuth 2.0 入门指南:掌握授权码模式

一、授权码模式 (1)spring-security-oauth2 从2.4.x版本开始,EnableAuthorizationServer注解就弃用过时了 (2)当前演示Demo版本:springboot的1.5.x版本与spring-security-oauth2的2.3.8.RELEASE整合&#…