spark的使用

spark的使用

spark是一款分布式的计算框架,用于调度成百上千的服务器集群。

安装pyspark

# os.environ['PYSPARK_PYTHON']='解析器路径' pyspark_python配置解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
pip install pyspark # 原始国外安装
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark  #网址安装

java安装

前置安装软件java包
java官网下载地址
一键下一步安装,配置环境变量
首先创建一个JAVA_HOME的全局变量然后在path中通过%%引入执行下面的bin 路径%JAVA_HOME%\bin

在这里插入图片描述
在这里插入图片描述
执行成功

from pyspark import SparkConf,SparkContext# 创建sparkConf 类对象
conf= SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc =SparkContext(conf=conf)
# 打印pySpark的运行脚本
print(sc.version)
# 停止sparkContext对象的运行(停止pySpark程序)
sc.stop()

PySpark的数据计算,都是基于RDD对象来进行的,RDD对象内置丰富的:成员方法(算子)

map算子

功能:map算子,是将RDD的数据一条条处理,处理的逻辑基于map算子中接收的处理函数,返回新的RDD语法:
在这里插入图片描述

# 简单执行map将数据乘以10返回,如果不引入python解析器的路径引入就会报错,
from pyspark import SparkConf, SparkContext
# 指定spark的python解析器路径
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 2, 3, 4, 5, 6])def func(data):return data * 10# map传入一个参数有返回值,是函数或者是值
rdd2 = rdd.map(func)
print(rdd2.collect())

在这里插入图片描述

flatMap

flatMapmap差不多就是在最后做了一个解除嵌套的功能

from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON']="D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize(['中石科技 时间还复活甲 如今房价','慰问金 咖啡机 姐夫哥','格很高 客服管家二恶烷 可归结为'])rdd2 = rdd.flatMap(lambda x:x.split(' '))print(rdd2.collect())

在这里插入图片描述
map的结果
在这里插入图片描述

reduceByKey

reduceByKey对数据进行分组可以两两计算

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = "D:/dev/python/python3.11.4/python.exe"
# 创建sparkConf 类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 创建sparkConf类对象创建sparkContext对象
sc = SparkContext(conf=conf)rdd = sc.parallelize([('男', 11), ('男', 22), ('女', 21), ('男', 31), ('女', 99)])
# 把男女进行分组value值进行计算
rdd2 = rdd.reduceByKey(lambda a, b:a+b)print(rdd2.collect()) # [('女', 120), ('男', 64)]

reduce

与reduce的区别就是没有进行分组

take

取出前几个数据

...
rdd = sc.parallelize([1,2,3,4,5]).take(3)  # [1,2,3]

count

计算rdd中的数据个数

filter

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)rdd=sc.parallelize([1,2,3,4,5])rdd2=rdd.filter(lambda a:a%2==0) 
print(rdd2.collect()) # [2,4]

distinct

进行数据去重

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']='D:/dev/python/python3.11.4/python.exe'conf=SparkConf().setMaster('local[*]').setAppName('test_spark')
sc=SparkContext(conf=conf)add= sc.parallelize([1,2,3,4,5,6,73,3,2,4,56,3,5])add2=add.distinct()
print(add2.collect()) # [56, 1, 73, 2, 3, 4, 5, 6]

sortBy排序

from pyspark import SparkConf, SparkContext
import osos.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python3.11.4/python.exe'conf = SparkConf().setMaster('local[*]').setAppName('test_spark')
sc = SparkContext(conf=conf)add = sc.textFile('D:/wordText.txt')word_rdd = add.flatMap(lambda x: x.split(' '))
word_with_rdd = word_rdd.map(lambda word: (word, 1))
result_rdd =word_with_rdd.reduceByKey(lambda a,b:a+b)
result_num=result_rdd.sortBy(lambda x:x[1],ascending=False,numPartitions=1) # 1.根据什么排序,2.True 升序 False降序 3.分布式分区
print(result_num.collect())

collect

将rdd内容变成list,从而就可以打印出来

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/67647.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Java】2021 RoboCom 机器人开发者大赛-高职组(复赛)题解

7-8 人工智能打招呼 号称具有人工智能的机器人,至少应该能分辨出新人和老朋友,所以打招呼的时候应该能有所区别。本题就请你为这个人工智能机器人实现这个功能:当它遇到陌生人的时候,会说:“Hello X, how are you?”其…

用于构建生成式 AI 应用程序备忘单的最佳 Python 工具

推荐:使用 NSDT场景编辑器 助你快速搭建可二次编辑器的3D应用场景 新一代的声音 KDnuggets发布了一份富有洞察力的新备忘单,重点介绍了用于构建生成AI应用程序的顶级Python库。 毫无疑问,读者都知道,生成式人工智能是目前数据科…

汽车制造业上下游协作时 外发数据如何防泄露?

数据文件是制造业企业的核心竞争力,一旦发生数据外泄,就会给企业造成经济损失,严重的,可能会带来知识产权剽窃损害、名誉伤害等。汽车制造业,会涉及到重要的汽车设计图纸,像小米发送汽车设计图纸外泄事件并…

【Vue-Router】别名

后台返回来的路径名不合理&#xff0c;但多个项目在使用中了&#xff0c;不方便改时可以使用别名。可以有多个或一个。 First.vue <template><h1>First Seciton</h1> </template>Second.vue&#xff0c;Third.vue代码同理 UserSettings.vue <tem…

DTC服务(0x14 0x19 0x85)

DTC相关的服务有ReadDTCInformation (19) service&#xff0c;ControlDTCSetting (85) service和ReadDTCInformation (19) service ReadDTCInformation (19) service 该服务允许客户端从车辆内任意一台服务器或一组服务器中读取驻留在服务器中的诊断故障代码( DTC )信息的状态…

vmalert集成钉钉告警

vmalert通过在alert.rules中配置告警规则实现告警&#xff0c;告警规则语法与Prometheus兼容&#xff0c;依赖Alertmanager与prometheus-webhook-dingtalk实现钉钉告警&#xff0c;以下步骤&#xff1a; 1、构建vmalert 从源代码构建vmalert&#xff1a; git clone https://…

软考:中级软件设计师:文件管理,索引文件结构,树型文件结构,位示图,数据传输方式,微内核

软考&#xff1a;中级软件设计师: 提示&#xff1a;系列被面试官问的问题&#xff0c;我自己当时不会&#xff0c;所以下来自己复盘一下&#xff0c;认真学习和总结&#xff0c;以应对未来更多的可能性 关于互联网大厂的笔试面试&#xff0c;都是需要细心准备的 &#xff08;1…

【笔试题心得】关于KMP在笔试中的题型

好几家都考到KMP了 问的比较多的是 next数组 &#xff0c; 其实KMP的相关机制我在代码随想录算法训练营第九天|KMP算法_菜鸟的Zoom之旅的博客-CSDN博客中写道过&#xff0c;现在在复习一下&#xff0c;由于next数组的定义其实会有所歧义&#xff08;有些程序中会直接将前缀表作…

从一到无穷大 #10 讨论 Apache IoTDB 大综述中看到的优劣势

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。 本作品 (李兆龙 博文, 由 李兆龙 创作)&#xff0c;由 李兆龙 确认&#xff0c;转载请注明版权。 文章目录 引言问题定义新技术数据模型schemalessTsfile设计双MemTable高级可扩展查询其他 IotD…

基于熵权法对Topsis模型的修正

由于层次分析法的最大缺点为&#xff1a;主观性太强&#xff0c;影响判断&#xff0c;对结果有很大影响&#xff0c;所以提出了熵权法修正。 变异程度方差/标准差。 如何度量信息量的大小&#xff1a; 把不可能的事情变成可能&#xff0c;这里面就有很多信息量。 概率越大&…

Unity框架学习--5 事件中心管理器

作用&#xff1a;访问其它脚本时&#xff0c;不直接访问&#xff0c;而是通过发送一条“命令”&#xff0c;让监听了这条“命令”的脚本自动执行对应的逻辑。 原理&#xff1a; 1、让脚本向事件中心添加事件&#xff0c;监听对应的“命令”。 2、发送“命令”&#xff0c;事件…

java版工程项目管理系统源码+系统管理+系统设置+项目管理+合同管理+二次开发em

​ 鸿鹄工程项目管理系统 Spring CloudSpring BootMybatisVueElementUI前后端分离构建工程项目管理系统 1. 项目背景 一、随着公司的快速发展&#xff0c;企业人员和经营规模不断壮大。为了提高工程管理效率、减轻劳动强度、提高信息处理速度和准确性&#xff0c;公司对内部…