大数据分析与应用实验任务十

大数据分析与应用实验任务十

实验目的:

  • 通过实验掌握spark SQL的基本编程方法;

  • 熟悉RDD到DataFrame的转化方法;

  • 通过实验熟悉spark SQL管理不同数据源的方法。

实验任务:

进入pyspark实验环境,在桌面环境打开jupyter notebook,或者打开命令行窗口,输入pyspark,完成下列任务:

实验一、参考教材5.3-5.6节各个例程编写代码,逐行理解并运行。
1. DataFrame 的创建

在编写独立应用程序时,可以通过如下语句创建一个 SparkSession 对象:

from pyspark import SparkContext,SparkConf 
from pyspark.sql import SparkSession 
sparklzy = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

读取在“/usr/local/spark/examples/src/main/resources/”目录下的样例数据 people.json

dfluozhongye = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
dfluozhongye.show()

image-20231130112410232

2. DataFrame 的保存
peopleDFlzy = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") peopleDFlzy.select("name", "age").write.format("json").save("file:///root/Desktop/luozhongye/newpeople.json")peopleDFlzy.select("name").write.format("text").save("file:///root/Desktop/luozhongye/newpeople.txt")

image-20231130112652757

如果要再次读取 newpeople.json 中的数据生成 DataFrame,可以直接使用 newpeople.json 目录名称,而不需要使用 part-00000-3db90180-ec7c-4291-ad05-df8e45c77f4d.json 文件(当然,使用这个文件也可以),代码如下:

peopleDFlzy = spark.read.format("json").load("file:///root/Desktop/luozhongye/newpeople.json") 
peopleDFlzy.show()

image-20231130112748692

3. DataFrame 的常用操作

创建好DataFrame以后,可以执行一些常用的DataFrame操作,包括printSchema()、select()、filter()、groupBy()和 sort()等。在执行这些操作之前,先创建一个 DataFrame:

dflzy=spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
(1) printSchema()

可以使用 printSchema()操作打印出 DataFrame 的模式(Schema)信息

dflzy.printSchema()

image-20231130112956579

(2) select()

select()操作的功能是从 DataFrame 中选取部分列的数据。

# select()操作选取了 name和 age 这两个列,并且把 age 这个列的值增加 1。
dflzy.select(dflzy['name'],dflzy['age']+1,).show()

image-20231130113032027

(3) filter()

filter()操作可以实现条件查询,找到满足条件要求的记录。

# 用于查询所有 age 字段的值大于 20 的记录。
dflzy.filter(dflzy["age"]>20)

image-20231130113111358

(4) groupBy()

groupBy()操作用于对记录进行分组。

# 根据 age 字段进行分组,并对每个分组中包含的记录数量进行统计
dflzy.groupBy("age").count().show()

image-20231130113200672

(5) sort()

sort()操作用于对记录进行排序。

# 表示根据 age 字段进行降序排序;
dflzy.sort(dflzy["age"].desc()).show()
# 表示根据 age 字段进行降序排序,当 age 字段的值相同时,再根据 name 字段的值进行升序排序
dflzy.sort(dflzy["age"].desc(),dflzy["name"].asc()).show()

image-20231130113655460

4. 从 RDD 转换得到 DataFrame
(1) 利用反射机制推断 RDD 模式

把 “/usr/local/spark/examples/src/main/resources/”目录下的people.txt 加载到内存中生成一个 DataFrame,并查询其中的数据。完整的代码及其执行过程如下:

from pyspark.sql import Rowpeople = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line: line.split(",")).map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询使用
schemaPeople.createOrReplaceTempView("people")
personsDF = spark.sql("select name,age from people where age > 20")
# DataFrame 中的每个元素都是一行记录,包含 name 和 age 两个字段,分别用 p.name 和 p.age 来获取值
personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + "," + "Age: " + str(p.age))
personsRDD.foreach(print)

image-20231130113902840

(2)使用编程方式定义 RDD 模式

利用 Spark SQL 查询 people.txt

from pyspark.sql.types import *
from pyspark.sql import Row# 下面生成“表头”
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name inschemaString.split(" ")]
schema = StructType(fields)
# 下面生成“表中的记录
lines = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
parts = lines.map(lambda x: x.split(","))
people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 下面把“表头”和“表中的记录”拼装在一起
schemaPeople = spark.createDataFrame(people, schema)
# 注册一个临时表供后面的查询使用
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name,age FROM people")
results.show()

image-20231130114042174

实验二、完成p113页实验内容第1题(spark SQL基本操作),另注意自行修改题目中的数据。
1. Spark SQL 基本操作

将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。

{ "id":1 , "name":" Ella" , "age":36 } 
{ "id":2, "name":"Bob","age":29 } 
{ "id":3 , "name":"Jack","age":29 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":4 , "name":"Jim","age":28 } 
{ "id":5 , "name":"Damon" } 
{ "id":5 , "name":"Damon" }
{ "id":6 , "name":"罗忠烨" }

为 employee.json 创建 DataFrame,并编写 Python 语句完成下列操作:

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("SparkSQLBasicOperations").getOrCreate()# 读取 JSON 文件并创建 DataFrame
employee_dflzy = spark.read.json("/root/Desktop/luozhongye/employee.json")

(1)查询所有数据;

employee_dflzy.show()

image-20231130114320725

(2)查询所有数据,并去除重复的数据;

employee_dflzy.dropDuplicates().show()

image-20231130114352878

(3)查询所有数据,打印时去除 id 字段;

employee_dflzy.select("name", "age").show()

image-20231130114426647

(4)筛选出 age>30 的记录;

employee_dflzy.filter(employee_dflzy["age"] > 30).show()

image-20231130114455985

(5)将数据按 age 分组;

employee_dflzy.groupBy("age").count().show()

image-20231130114520305

(6)将数据按 name 升序排列;

employee_dflzy.orderBy("name").show()

image-20231130114547358

(7)取出前 3 行数据;

employee_dflzy.limit(3).show()

image-20231130114612796

(8)查询所有记录的 name 列,并为其取别名为 username;

employee_dflzy.select("name").withColumnRenamed("name", "username").show()

image-20231130114635538

(9)查询年龄 age 的平均值;

employee_dflzy.agg({"age": "avg"}).show()

image-20231130114703990

(10)查询年龄 age 的最小值。

employee_dflzy.agg({"age": "min"}).show()

image-20231130114736776

(11)停止 SparkSession

spark.stop()
2. 编程实现将 RDD 转换为 DataFrame

源文件employee.txt内容如下(包含 id,name,age):

1,Ella,36 
2,Bob,29 
3,Jack,29

实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代码。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType# 创建 SparkSession
spark = SparkSession.builder.appName("RDDtoDataFrame").getOrCreate()# 读取文本文件并创建 RDD
rdd = spark.sparkContext.textFile("/root/Desktop/luozhongye/employee.txt")# 定义数据模式
schema = StructType([StructField("id", IntegerType(), True),StructField("name", StringType(), True),StructField("age", IntegerType(), True)
])# 将 RDD 转换为 DataFrame
employee_df = rdd.map(lambda line: line.split(",")).map(lambda x: (int(x[0]), x[1], int(x[2]))).toDF(schema=schema)# 打印 DataFrame 的所有数据
employee_df.show(truncate=False)# 停止 SparkSession
spark.stop()

image-20231130120104859

3. 编程实现利用 DataFrame 读写 MySQL 的数据
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee, 包含下表 所示的两行数据。
idnamegenderage
1AliceF22
2JohnM25
-- 创建数据库
CREATE DATABASE IF NOT EXISTS sparktest;-- 切换到 sparktest 数据库
USE sparktest;-- 创建 employee 表
CREATE TABLE IF NOT EXISTS employee (id INT PRIMARY KEY,name VARCHAR(255),gender CHAR(1),age INT
);-- 插入数据
INSERT INTO employee VALUES (1, 'Alice', 'F', 22), (2, 'John', 'M', 25);
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入表 5-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
idnamegenderage
3MaryF26
4TomM23
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame# 创建 SparkSession
#"/path/to/mysql-connector-java-x.x.xx.jar":实际的 MySQL Connector/J JAR 文件路径。
spark = SparkSession.builder.appName("MySQLDataFrame").config("spark.jars", "/path/to/mysql-connector-java-x.x.xx.jar" 
).getOrCreate()# 读取数据到 DataFrame
employee_data = [(3, 'Mary', 'F', 26), (4, 'Tom', 'M', 23)]
columns = ["id", "name", "gender", "age"]
new_data_df = spark.createDataFrame(employee_data, columns)# 配置 MySQL 连接信息
mysql_url = "jdbc:mysql://localhost:3306/sparktest"
mysql_properties = {"user": "your_username",# 实际的 MySQL 用户名"password": "your_password",# 实际的 MySQL 密码"driver": "com.mysql.cj.jdbc.Driver"
}# 将数据写入 MySQL 表
new_data_df.write.jdbc(url=mysql_url, table="employee", mode="append", properties=mysql_properties)# 从 MySQL 中读取数据到 DataFrame
employee_df = spark.read.jdbc(url=mysql_url, table="employee", properties=mysql_properties)# 打印 DataFrame 的所有数据
employee_df.show()# 打印 age 的最大值和总和
employee_df.agg({"age": "max", "age": "sum"}).show()# 停止 SparkSession
spark.stop()

本文结束欢迎点赞,收藏,有问题可以在评论区讨论!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/233895.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Java SSM框架实现美食推荐管理系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现美食推荐管理系统演示 摘要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识&a…

使用Jetty编写RESTful接口

一、依赖 <!--Jetty服务器的核心依赖项&#xff0c;用于创建和管理服务器。--><dependency><groupId>org.eclipse.jetty</groupId><artifactId>jetty-server</artifactId><version>9.4.43.v20210629</version></dependency…

样品实验Oxfilm351CN高沸点低VOC成膜助剂TDS说明书

样品实验Oxfilm351CN高沸点低VOC成膜助剂TDS说明书 1KG/瓶

在Windows中加密文件或文件夹不需要太大的努力就可以实现,主要有两种加密方法

如果你正在寻找一种在Windows计算机上保持文件和文件夹隐私的简单方法,你有几个选择。 得益于Microsoft Office Suite,你可以使用内置的加密功能对Office文件(如Word文档或PowerPoint演示文稿)进行密码保护。 一些Windows操作系统还配备了加密文件系统(EFS),可以对任何…

建设“参与城市”大学--SMU在2023年绿色金融全球论坛上分享观点

2023年11月21日&#xff0c;由新加坡管理大学&#xff08;SMU&#xff0c;简称新大&#xff09;和中国人民大学&#xff08;RUC&#xff0c;简称人大&#xff09;联合主办的“绿色金融与治理&#xff1a;从承诺到行动”全球论坛在北京召开。论坛汇集了来自新加坡、中国及世界各…

Linux系统iptables

目录 一. 防火墙简介 1. 防火墙定义 2. 防火墙分类 ①. 网络层防火墙 ②. 应用层防火墙 二. iptables 1. iptables定义 2. iptables组成 ①. 规则表 ②. 规则链 3. iptables格式 ①. 管理选项 ②. 匹配条件 ③. 控制类型 四. 案例说明 1. 查看规则表 2. 增加新…

详解云WAF:免费GOODWAF归来

文前聊心 说说这篇文章的目的&#xff1a; 介绍一下自己的开发升级的项目&#xff1a;GOODWAF&#xff0c;看名字也能看的出来这是一款防火墙&#xff0c;但它不同于现在的软件防火墙&#xff0c;它是一款云WAF防火墙。 其实GOODWAF这个IP概念前两年就存在了&#xff0c;但为什…

Python —— Mock接口测试

前言 今天跟小伙伴们一起来学习一下如何编写Python脚本进行mock测试。 什么是mock? 测试桩&#xff0c;模拟被测对象的返回&#xff0c;用于测试 通常意义的mock指的就是mock server, 模拟服务端返回的接口数据&#xff0c;用于前端开发&#xff0c;第三方接口联调 为什么…

Python MD5加密的三种方法(可加盐)

方法一&#xff1a;MD5直接加密 import hashlibtext1123456 print(text1) mdhashlib.md5(text1.encode()) # 创建md5对象 md5pwdmd.hexdigest() # md5加密 print(md5pwd) 输出结果&#xff1a; 方法二&#xff1a;MD5盐加密&#xff0c;将盐拼接在原密码后 import ha…

生成式AI与预测式AI的主要区别与实际应用

近年来&#xff0c;预测式人工智能&#xff08;Predictive AI&#xff09;通过先进的推荐算法、风险评估模型、以及欺诈检测工具&#xff0c;一直在推高着该领域公司的投资回报率。然而&#xff0c;今年初突然杀出的生成式人工智能&#xff08;Generative AI&#xff09;突然成…

直播前期准备

直播前的准备是一个综合性的过程&#xff0c;需要从多个方面进行考虑和准备。以下是一些直播前准备的参考∶ 1.确定直播主题和目标∶明确直播的主题和目标&#xff0c;以及如何吸引观众。考虑观众的兴趣和需求&#xff0c;选择一个熟悉且具有吸引力的主题&#xff0c;以提升直…

8.0 新特性 - Generated Invisible Primary Key

文章目录 说明1. GIPK 介绍1.1 参数设置2.2 可见性测试2.3 修改元数据可见性2.4 修改查询可见性 2. GIPK 测试2.1 Binlog 分析2.2 主从复制2.3 逻辑备份2.4 其它限制2.4.1 AUTO_INCREMENT 属性2.4.2 my_row_id 关键字 后记 说明 MySQL Innodb 引擎采用的是 IOT&#xff08;索引…