使用Spark探索数据

需求分析

使用Spark来探索数据是一种高效处理大规模数据的方法,需要对数据进行加载、清洗和转换,选择合适的Spark组件进行数据处理和分析。需求分析包括确定数据分析的目的和问题、选择合适的Spark应用程序和算法、优化数据处理流程和性能、可视化和解释分析结果。同时,需要熟悉Spark的基本概念和操作,掌握Spark编程和调优技巧,以确保数据探索的准确性和效率。

系统实现

了解实验目的

掌握python on Spark的使用理解探索数据的意义和方法,掌握使用Spark探索数据的过程。

1.实验整体流程分析:

  • 准备环境,安装Hadoop和Spark组件
  • 准备数据,采用开源movielens数据集
  • 探索用户数据
  • 探索电影数据
  • 探索电影评级数据

 2.准备数据:

  • 打开终端,启动Hadoop和Spark集群

  • 下载相关数据集

  • 将数据集解压到/usr/目录下

  • 上传数据至HDFS
# hadoop fs -mkdir /data
# hadoop fs -ls /
# hadoop fs -put /usr/data/u.user /data/u.user
# hadoop fs -put /usr/data/u.data /data/u.data
# hadoop fs -put /usr/data/u.genre /data/u.genre
# hadoop fs -put /usr/data/u.info /data/u.info
# hadoop fs -put /usr/data/u.item /data/u.item
# hadoop fs -put /usr/data/u.occupation /data/u.occupation
# hadoop fs -ls /data

上传后的HDFS的data目录结构如图所示

3.探索用户数据:

  • 打开终端,执行pyspark命令,进入Spark的python环境

  • 打印首行记录

运行结果如下

  • 分别统计用户、性别和职业的个数
# 以' | '切分每列,返回新的用户RDD
user_fields = user_data.map(lambda line: line.split("|"))
# 统计用户数
num_users = user_fields.map(lambda fields: fields[0]).count()
# 统计性别数
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count()
# 统计职业数
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
# 统计邮编数
num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
# 返回结果
print ("用户数: %d, 性别数: %d, 职业数: %d, 邮编数: %d" % (num_users, num_genders, num_occupations, num_zipcodes))

运行结果如下

  • 查看年龄分布情况,并用plt.show绘制

  • 查看职业分布情况,同样绘制图
# 并行统计各职业人数的个数,返回职业统计RDD后落地
count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
# 生成x/y坐标轴
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
x_axis = x_axis1[np.argsort(x_axis1)]
y_axis = y_axis1[np.argsort(y_axis1)]
# 生成x轴标签
pos = np.arange(len(x_axis))
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(x_axis)
# 绘制职业人数条状图
plt.xticks(rotation=30)
plt.bar(pos, y_axis, width, color='lightblue')
plt.show()

 

  • 统计各职业人数

4.探索电影数据:

  • 重新打开终端,执行pyspark命令,进入Spark的python环境

  • 打印首行记录

  • 查看电影的数量

  • 过滤掉没有发现时间信息的记录

注意,输入时需要手动缩进

  • 查看影片的年龄分布并绘图

5.探索评级数据:

  • 重新打开终端,进入Spark的bin目录下,执行pyspark命令,进入Spark的python环境

  • 打印首行记录

  • 查看有多少人参与了评分

  • 统计最高、最低、平均、中位评分,以及平均每个用户的评分次数
# 以' | '切分每列,返回新的用户RDD
user_fields = user_data.map(lambda line: line.split("|"))
# 统计用户数
num_users = user_fields.map(lambda fields: fields[0]).count()
# 获取电影数量
num_movies = movie_data.count()
# 获取评分RDD
rating_data = rating_data_raw.map(lambda line: line.split("\t"))
ratings = rating_data.map(lambda fields: int(fields[2]))
# 计算最大/最小评分
max_rating = ratings.reduce(lambda x, y: max(x, y))
min_rating = ratings.reduce(lambda x, y: min(x, y))
# 计算平均/中位评分
mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())
# 计算每个观众/每部电影平均打分/被打分次数
ratings_per_user = num_ratings / num_users
ratings_per_movie = num_ratings / num_movies
# 输出结果
print("最低评分: %d" % min_rating)
print("最高评分: %d" % max_rating)
print("平均评分: %2.2f" % mean_rating)
print("中位评分: %d" % median_rating)
print("平均每个用户打分(次数): %2.2f" % ratings_per_user)
print("平均每部电影评分(次数): %2.2f" % ratings_per_movie)

  • 统计评分分布情况
# 生成评分统计RDD,并落地
count_by_rating = ratings.countByValue()
# 生成x/y坐标轴
x_axis = np.array(count_by_rating.keys())
y_axis = np.array([float(c) for c in count_by_rating.values()])
# 对人数做标准化
y_axis_normed = y_axis / y_axis.sum()
# 生成x轴标签
pos = np.arange(len(y_axis))
width = 1.0
ax = plt.axes()
ax.set_xticks(pos + (width / 2))
ax.set_xticklabels(y_axis)
# 绘制评分分布柱状图
plt.bar(pos, y_axis_normed, width, color='lightblue')
plt.xticks(rotation=30)
plt.show()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/505821.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mysql学习之MVCC解决读写问题

多版本并发控制 什么是MVCC MVCC (Multiversion Concurrency Control)多版本并发控制。顾名思义,MVCC是通过数据行的多个版本管理来实现数据库的并发控制。这项技术使得在InnoDB的事务隔离级别下执行一致性读操作有了保证。换言之&#xff0…

C++的内联函数

目录 前言 内联函数 为什么声明和定义分离 为什么声明和定义分离后不出错 为什么内联函数不支持声明和定义分离 为什么内联函数支持声明和定义不分离 坚持声明和定义不分离的解决方法 static修饰函数 inline修饰函数 结论 声明和定义不分离的应用场景 前言 在C语言…

二次元风格地址发布页源码

二次元风格地址发布页源码,源码由HTMLCSSJS组成,记事本打开源码文件可以进行内容文字之类的修改,双击html文件可以本地运行效果,也可以上传到服务器里面,重定向这个界面 下载地址 https://www.qqmu.com/2347.html

C3_W2_Collaborative_RecSys_Assignment_吴恩达_中英_Pytorch

Practice lab: Collaborative Filtering Recommender Systems(实践实验室:协同过滤推荐系统) In this exercise, you will implement collaborative filtering to build a recommender system for movies. 在本次实验中,你将实现协同过滤来构建一个电影推荐系统。 …

在什么时候企业档案才会发生调整

档案在企业中通常会调整在以下几个时刻: 1. 入职时:员工入职时,企业会要求员工提供个人档案,包括身份证件、学历证明、工作经历等相关文件。 2. 离职时:员工离职时,企业会整理员工的离职档案,包…

Spring重点记录

文章目录 1.Spring的组成2.Spring优点3.IOC理论推导4.IOC本质5.IOC实现:xml或者注解或者自动装配(零配置)。6.hellospring6.1beans.xml的结构为:6.2.Spring容器6.3对象的创建和控制反转 7.IOC创建对象方式7.1以有参构造的方式创建…

SpringCloud-MQ消息队列

一、消息队列介绍 MQ (MessageQueue) ,中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。消息队列是一种基于生产者-消费者模型的通信方式,通过在消息队列中存放和传递消息,实现了不同组件、服务或系统…

鸿蒙Harmony应用开发—ArkTS声明式开发(通用属性:尺寸设置)

用于设置组件的宽高、边距。 说明: 从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 width width(value: Length) 设置组件自身的宽度,缺省时使用元素自身内容需要的宽度。若子组件的宽大于父组件的…

电子电气架构——汽车以太网诊断路由汇总

电子电气架构——汽车以太网诊断路由汇总 我是穿拖鞋的汉子,魔都中坚持长期主义的工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 人们会在生活中不断攻击你。他们的主要武器是向你灌输对自己的怀疑:你的价值、你的能力、你的潜力。他们往往会将…

代码随想录刷题笔记 DAY 38 | 不同路径 No.62 | 不同路径II No.63

文章目录 Day 3801. 不同路径&#xff08;No. 62&#xff09;<1> 题目<2> 笔记<3> 代码 02. 不同路径&#xff08;No. 63&#xff09;<1> 题目<2> 笔记<3> 代码 Day 38 01. 不同路径&#xff08;No. 62&#xff09; 题目链接 代码随想录…

Redis安全加固策略:服务账号管理 开启redis密码认证 开启防护模式

Redis安全加固策略&#xff1a;服务账号管理 & 开启redis密码认证 & 开启防护模式 1.1 服务账号管理1.1.1 检测方法1.1.2 加固参考配置操作 1.2 开启redis密码认证1.2.1 检测方法1.2.2 加固参考配置操作 1.3 开启防护模式1.3.1 检测方法1.3.2 加固参考配置操作 &#x…

Ubuntu进入python时报错:找不到命令 “python”,“python3” 命令来自 Debian 软件包 python3

一、错误描述 二、解决办法 进入”/usr/bin”目录下&#xff0c;查看/usr/bin目录中所有与python相关的文件和链接&#xff1a; cd /usr/bin ls -l | grep python 可以看到Python3指向的是Python3.10&#xff0c;而并无指向python3的软连接 只需要在python与python3之间手动…