大数据分布式计算工具Spark数据计算实战讲解(filter方法,distinct方法,sortby方法)

练习案例

# #单词统计计数
from pyspark import SparkConf, SparkContext
import os
os.environ['pyspark_python'] = "D:/python/JIESHIQI/python.exe"
#创建一个sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")#基于sparkconf类对象创建sparkcontext类对象
sc = SparkContext(conf=conf)
#读取文件信息
rdd = sc.textFile("D:/hello.txt")
#取出全部的单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
print(word_rdd.collect())
#['apple', 'bean', 'banana', 'spark', 'haoop', 'python', 'java', 'go', 'c++']#将所有单词都转换成二元元组,单词为key,value设置为1
rdd2 = word_rdd.map(lambda word: (word, 1))
print(rdd2.collect())
#[('apple', 1), ('bean', 1), ('banana', 1), ('spark', 1), ('haoop', 1), ('python', 1), ('java', 1), ('go', 1), ('c++', 1)]#分组并求和
rdd3 = rdd2.reduceByKey(lambda a, b: a+b)
print(rdd3.collect())

 filter方法

功能:过滤想要的数据进行保留

from pyspark import SparkConf, SparkContext
import os
os.environ['pyspark_python'] = "D:/python/JIESHIQI/python.exe"
#创建一个sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")#基于sparkconf类对象创建sparkcontext类对象
sc = SparkContext(conf=conf)#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5])
#对rdd的数据进行过滤
rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd2.collect()) #[2, 4]

 distinct算子

功能:对rdd数据进行去重,返回新rdd

from pyspark import SparkConf, SparkContext
import os
os.environ['pyspark_python'] = "D:/python/JIESHIQI/python.exe"
#创建一个sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")#基于sparkconf类对象创建sparkcontext类对象
sc = SparkContext(conf=conf)#准备一个rdd
rdd = sc.parallelize([1,2,3,4,5,2,1,3,4,5])
#对rdd的数据进行过滤
# rdd2 = rdd.filter(lambda num: num % 2 == 0)print(rdd.distinct().collect()) 
#[1, 2, 3, 4, 5]

sortby算子

功能:对rdd数据进行排序,基于你指定的排序依据

from pyspark import SparkConf, SparkContext
import os
os.environ['pyspark_python'] = "D:/python/JIESHIQI/python.exe"
#创建一个sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")#基于sparkconf类对象创建sparkcontext类对象
sc = SparkContext(conf=conf)
#读取文件信息
rdd = sc.textFile("D:/hello.txt")
#取出全部的单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
print(word_rdd.collect())
#['apple', 'bean', 'banana', 'spark', 'haoop', 'python', 'java', 'go', 'c++']#将所有单词都转换成二元元组,单词为key,value设置为1
rdd2 = word_rdd.map(lambda word: (word, 1))
print(rdd2.collect())
#[('apple', 1), ('bean', 1), ('banana', 1), ('spark', 1), ('haoop', 1), ('python', 1), ('java', 1), ('go', 1), ('c++', 1)]#分组并求和
rdd3 = rdd2.reduceByKey(lambda a, b: a+b)
print(rdd3.collect())#对结果进行排序
final_rdd = rdd3.sortBy(lambda x: x[1],ascending=True,numPartitions=1)
print(final_rdd)

练习案例

需求,复制以上内容到文件中,使用Spark读取文件进行计算:

•各个城市销售额排名,从大到小

•全部城市,有哪些商品类别在售卖

•北京市有哪些商品类别在售卖

from pyspark import SparkConf, SparkContext
import os
import json
os.environ['pyspark_python'] = "D:/python/JIESHIQI/python.exe"
#创建一个sparkconf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")#基于sparkconf类对象创建sparkcontext类对象
sc = SparkContext(conf=conf)#读取文件得到rdd
file_rdd = sc.textFile("D:/hello.txt")
#取出一个个JSON字符串
josn_str = file_rdd.flatMap(lambda x: x.split("|"))
#将一个个json字符串转换为字典
dict_rdd = josn_str.map(lambda x: json.loads(x))
# print(dict_rdd.collect())
#取出城市和销售额的排行
value_rdd = dict_rdd.map(lambda x: (x['areaName'],int(x['money'])))
#按城市分组按销售额聚合
rdd2 = value_rdd.reduceByKey(lambda a,b: a+b)
result1 = rdd2.sortBy(lambda x: x[1],ascending=False,numPartitions=1)
print(result1.collect())
#[('北京', 91556), ('杭州', 28831), ('天津', 12260), ('上海', 1513), ('郑州', 1120)]
#取出全部的商品的类别
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print(category_rdd.collect())
#['电脑', '家电', '食品', '平板电脑', '手机', '家具', '书籍', '服饰']#过滤北京市的数据
beijing_data = dict_rdd.filter(lambda x: x['areaName'] == '北京')
print(beijing_data.collect())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/508784.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为云命令行工具KooCLI—高效云端管理的秘诀

做运维多年,公司从传统运维改为云上。刚一接触时,确实因为要学习很多云知识而烦恼。每次想要执行某个操作时,都要先登录到云平台,浏览界面,寻找正确的按钮。这样不仅浪费时间,还经常出错。直到有一天&#…

【小白友好】LeetCode 打家劫舍 III

https://leetcode.cn/problems/house-robber-iii/description/ 前言 建议还是先看看动态规划的基础题再看这个。动态规划是不刷题,自己100%想不出来的。 基础题: 最大子数组和乘积最大子数组最长递增子序列 最大升序子数组和 小白想法 现在我们想遍…

云桥通+跨境电商:SDWAN企业组网优化跨境网络案例

跨境电商企业在全球范围内展开业务,需构建稳定高效的网络架构以支持其电商平台运营。云桥通SDWAN企业组网技术为跨境电商提供网络连接和管理的优化,提升网络性能、可靠性和安全性。以下是一家跨境电商企业的SDWAN组网案例,详细介绍其实施情况…

命名实体识别NER(综合代码示例)

一、命名实体识别发展方向 二、中文数据集 CCKS2017开放的中文的电子病例测评相关的数据。 评测任务一:https://biendata.com/competition/CCKS2017_1/ 评测任务二:https://biendata.com/competition/CCKS2017_2/ CCKS2018开放的音乐领域的实体识别任务…

【Qt】Sqlite数据库加密

1. 加密方式 对数据库文件加密。既不会暴露表结构,也不会暴露数据细节。 2. 加密工具(QtCipherSqlitePlugin) 用于密码 SQLite 的 Qt 插件,它基于 SQLite 源和 wxWidget 中的 wxSQLite3插件github地址:https://gith…

重读 Java 设计模式: 探索经典之道与 Spring 框架的设计

写在开头 记得大学刚毕业那会儿,想学点东西,于是拿出了《Head First 设计模式》这本书,就开始了阅读,我曾对这些模式感到晦涩难懂。然而,随着工作岁月的增长,我逐渐领悟到设计模式的价值,尤其是…

2024高频前端面试题 JavaScript 和 ES6 篇

HTML和CSS篇: 2024高频前端面试题 HTML 和 CSS 篇-CSDN博客 一. JavaScript篇 1. 数据类型有哪些 1) 基本数据类型 数值(Number)、字符串(String)、布尔值(Boolean)、Undefined、Null、Symbol、BigInt 2) 引用数据类型 对象(Object)、数组(Array)、函数(Funct…

为啥要用C艹不用C?

在很多时候,有人会有这样的疑问 ——为什么要用C?C相对于C优势是什么? 最近两年一直在做Linux应用,能明显的感受到C带来到帮助以及快感 之前,我在文章里面提到环形队列 C语言,环形队列 环形队列到底是怎么回…

keycloak-keycloak部署启动及打开调试日志

一、问题描述 keycloak截止目前已更新到23.0.7版本,好多网上关于keycloak教程都停留在15版本之前,有一些地方版本变化较大,计划写一个系列来记录keycloak使用。本文主要记录keycloak启动及打开调试日志的方法。 本文keycloak实验版本为23.0…

MySQL的初学者教程—Navicat的基本操作方法

MySQL的初学者教程—Navicat的基本操作方法 1、运行Navicat 双击桌面的Navicat 12 for MySQL。 2、新建MySQL连接 点击【测试连接】。 zyyMySQL的连接创建成功! 3、新建数据库 4、新建表 点击【保存】 表【usermanage】建好了。 点【usermanage】的鼠标右键&#…

PDA主要用来做什么?

PDA,可以满足信息采集、信息处理、信息查询的需求,实现信息的一体化管理,帮助企业迎来无纸化操作时代,减少失误、提高效率、提升企业竞争力,并最终赢得市场。主要具备以下几个功能:1、条码扫描条码扫描是手…

【蓝牙协议栈】【BR/EDR】【AVRCP】AVRCP常用指令介绍

1. AVRCP常用指令协议栈介绍 1.1 AVRCP Play 播放蓝牙音乐,使用AVCTP的AV/C格式的命令走控制通道 1.2 AVRCP Pause 暂停蓝牙音乐,同播放指令一样使用AV/C格式的命令走控制通道 实际使用中由于CT端可能会快速发送两次播放或暂停指令,会触发…