spark的算子

spark的算子

在这里插入图片描述

1.spark的单Value算子

Spark中的单Value算子是指对一个RDD中的每个元素进行操作,并返回一个新的RDD。下面详细介绍一些常用的单Value算子及其功能:

  1. map:逐条映射,将RDD中的每个元素通过指定的函数转换成另一个值,最终返回一个新的RDD。
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.map(lambda x: x * 2)
# result: [2, 4, 6, 8, 10]
  1. flatMap: 扁平化映射,将RDD中的每个元素通过指定的函数转换成多个值,并将这些值展开为一个新的RDD。
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.flatMap(lambda x: range(x, x+3))
# result: [1, 2, 3, 2, 3, 4, 3, 4, 5, 4, 5, 6, 5, 6, 7]
  1. glom:将一个分区中的多个单条数据转换为相同类型的单个数组进行处理。返回一个新的RDD,其中每个元素是一个数组。
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # 两个分区
result = rdd.glom().collect()
# result: [[1, 2], [3, 4, 5]]
  1. groupBy: 将RDD中的元素按照指定条件分组,返回一个键值对RDD,其中的每个元素是一个(key, iterator)对,key为分组的条件,iterator为对应分组的元素迭代器。
rdd = sc.parallelize(['apple', 'banana', 'cherry', 'date'])
result = rdd.groupBy(lambda x: x[0])
# result: [('a', ['apple']), ('b', ['banana']), ('c', ['cherry']), ('d', ['date'])]
  1. filter:根据指定的规则过滤出符合条件的元素,返回一个新的RDD。
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.filter(lambda x: x % 2 == 0)
# result: [2, 4]
  1. sample:从RDD中进行采样,返回一个包含采样结果的新的RDD。
rdd = sc.parallelize(range(10))
result = rdd.sample(False, 0.5)
# result: [0, 2, 3, 4, 5, 7]
  1. distinct(shuffle):去重,将RDD中重复的元素去除,返回一个由不重复元素组成的新的RDD。
rdd = sc.parallelize([1, 2, 2, 3, 3, 3])
result = rdd.distinct()
# result: [1, 2, 3]
  1. coalesce(shuffle):缩减分区,将RDD的分区数缩减为指定的数量。
rdd = sc.parallelize([1, 2, 3, 4, 5], 4)  # 4个分区
result = rdd.coalesce(2)
# result: [1, 2, 3, 4, 5](分区数变为2)
  1. repartition(shuffle):扩增分区数,底层是coalesce。将RDD的分区数扩增到指定的数量。
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)  # 2个分区
result = rdd.repartition(4)
# result: [1, 2], [3, 4], [5](分区数变为4)
  1. sortBy(shuffle):根据指定的规则对数据源中的数据进行排序,默认为升序。
rdd = sc.parallelize([3, 1, 4, 2, 5])
result = rdd.sortBy(lambda x: x)
# result: [1, 2, 3, 4, 5]

这些单Value算子能够对RDD中的每个元素进行处理,并返回一个新的RDD,可以用于各种数据转换、过滤、去重等操作。

2. Spark的双Value算子

双Value算子是指对两个RDD进行操作,并返回一个新的RDD。下面介绍一些常用的双Value算子及其功能:

  1. union: 对两个RDD求并集,返回包含两个RDD中所有元素的新RDD。
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.union(rdd2)
# result: [1, 2, 3, 3, 4, 5]
  1. intersection: 对两个RDD求交集,返回包含两个RDD中共有元素的新RDD。
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.intersection(rdd2)
# result: [3]
  1. subtract: 对两个RDD求差集,返回只属于第一个RDD而不属于第二个RDD的元素的新RDD。
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
result = rdd1.subtract(rdd2)
# result: [1, 2]
  1. cartesian: 对两个RDD进行笛卡尔积操作,返回所有可能的元素对组成的新RDD。
rdd1 = sc.parallelize([1, 2])
rdd2 = sc.parallelize(['a', 'b'])
result = rdd1.cartesian(rdd2)
# result: [(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b')]
  1. zip: 将两个RDD的元素按照索引位置进行配对,返回键值对组成的新RDD。
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
result = rdd1.zip(rdd2)
# result: [(1, 'a'), (2, 'b'), (3, 'c')]
  1. join: 对两个键值对RDD进行内连接操作,返回具有相同键的元素对组成的新RDD。
rdd1 = sc.parallelize([(1, 'apple'), (2, 'banana')])
rdd2 = sc.parallelize([(1, 'red'), (2, 'yellow')])
result = rdd1.join(rdd2)
# result: [(1, ('apple', 'red')), (2, ('banana', 'yellow'))]
  1. leftOuterJoin: 对两个键值对RDD进行左外连接操作,返回左侧RDD中所有元素以及与之匹配的右侧RDD中的元素对组成的新RDD。
rdd1 = sc.parallelize([(1, 'apple'), (2, 'banana')])
rdd2 = sc.parallelize([(1, 'red'), (3, 'yellow')])
result = rdd1.leftOuterJoin(rdd2)
# result: [(1, ('apple', 'red')), (2, ('banana', None))]
  1. rightOuterJoin: 对两个键值对RDD进行右外连接操作,返回右侧RDD中所有元素以及与之匹配的左侧RDD中的元素对组成的新RDD。
rdd1 = sc.parallelize([(1, 'apple'), (2, 'banana')])
rdd2 = sc.parallelize([(1, 'red'), (3, 'yellow')])
result = rdd1.rightOuterJoin(rdd2)
# result: [(1, ('apple', 'red')), (3, (None, 'yellow'))]

这些双Value算子能够对两个RDD进行操作,并返回一个新的RDD,可以用于求并集、交集、差集等操作,也可以进行连接操作,根据键值对进行配对。

3. Spark的Key-Value算子

Key-Value算子是指对键值对RDD进行操作的算子,这些算子主要用于处理具有键值对结构的数据,其中键位于第一列,值位于第二列。下面介绍一些常用的Key-Value算子及其功能:

  1. reduceByKey: 对具有相同键的元素进行聚合操作,返回一个新的键值对RDD。
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
result = rdd.reduceByKey(lambda x, y: x + y)
# result: [(1, 5), (2, 9)]
  1. groupByKey: 对具有相同键的元素进行分组操作,返回一个新的键值对RDD。
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
result = rdd.groupByKey()
# result: [(1, <pyspark.resultiterable.ResultIterable object at 0x7f3128a3e370>), (2, <pyspark.resultiterable.ResultIterable object at 0x7f3128a3e3d0>)]
  1. sortByKey: 按照键的顺序对RDD进行排序操作,默认升序排列。
rdd = sc.parallelize([(3, 'apple'), (1, 'banana'), (2, 'orange')])
result = rdd.sortByKey()
# result: [(1, 'banana'), (2, 'orange'), (3, 'apple')]
  1. mapValues: 对键值对RDD中的值进行操作,返回一个新的键值对RDD。
rdd = sc.parallelize([(1, 'apple'), (2, 'banana')])
result = rdd.mapValues(lambda x: 'fruit ' + x)
# result: [(1, 'fruit apple'), (2, 'fruit banana')]
  1. flatMapValues: 对键值对RDD中的值进行扁平化操作,返回一个新的键值对RDD。
rdd = sc.parallelize([(1, 'hello world'), (2, 'goodbye')])
result = rdd.flatMapValues(lambda x: x.split())
# result: [(1, 'hello'), (1, 'world'), (2, 'goodbye')]
  1. keys: 返回所有键组成的一个新的RDD。
rdd = sc.parallelize([(1, 'apple'), (2, 'banana')])
result = rdd.keys()
# result: [1, 2]
  1. values: 返回所有值组成的一个新的RDD。
rdd = sc.parallelize([(1, 'apple'), (2, 'banana')])
result = rdd.values()
# result: ['apple', 'banana']

除了上述提到的常用Key-Value算子,还有一些其他常见的Key-Value算子,它们在处理键值对RDD时也非常有用。以下是其中几个:

  1. countByKey: 统计每个键出现的次数,返回一个字典。
rdd = sc.parallelize([(1, 'apple'), (1, 'banana'), (2, 'orange'), (2, 'banana')])
result = rdd.countByKey()
# result: {1: 2, 2: 2}
  1. collectAsMap: 将键值对RDD转换为字典形式。
rdd = sc.parallelize([(1, 'apple'), (2, 'banana')])
result = rdd.collectAsMap()
# result: {1: 'apple', 2: 'banana'}
  1. lookup: 查找具有给定键的所有值,并返回一个列表。
rdd = sc.parallelize([(1, 'apple'), (2, 'banana'), (1, 'orange')])
result = rdd.lookup(1)
# result: ['apple', 'orange']
  1. foldByKey: 对具有相同键的元素进行折叠操作,类似于reduceByKey,但可以指定初始值。
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
result = rdd.foldByKey(0, lambda x, y: x + y)
# result: [(1, 5), (2, 9)]
  1. aggregateByKey: 对具有相同键的元素进行聚合操作,可以指定初始值和两个函数:一个用于局部聚合,另一个用于全局聚合。
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
result = rdd.aggregateByKey(0, lambda x, y: x + y, lambda x, y: x + y)
# result: [(1, 5), (2, 9)]

这些Key-Value算子能够对键值对RDD进行操作,实现聚合、分组、排序、映射等功能。使用这些算子可以更方便地处理具有键值对结构的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/212266.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Laravel 安装(笔记一)

目录 第一步、Laravel 一般使用 composer安装 第二步、使用composer安装项目 第三步、配置环境 第四步、访问域名&#xff0c;安装完成 Laravel 官网 l​​​​​​​Installation - Laravel 中文网 为 Web 工匠创造的 PHP 框架 第一步、Laravel 一般使用 composer安装 如…

java项目之消防物资存储系统(ssm+vue)

项目简介 消防物资存储系统实现了以下功能&#xff1a; 管理员功能: 管理员登陆后&#xff0c;主要模块包括首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;仓库管理&#xff0c;物资入库管理&#xff0c;物资出库管理&#xff0c;仓库管理&#xff0c;物资详情管…

基于Pytest+Requests+Allure实现接口自动化测试!

一、整体结构 框架组成&#xff1a;pytestrequestsallure 设计模式&#xff1a; 关键字驱动 项目结构&#xff1a; 工具层&#xff1a;api_keyword/ 参数层&#xff1a;params/ 用例层&#xff1a;case/ 数据驱动&#xff1a;data_driver/ 数据层&#xff1a;data/ 逻辑层&…

jetson xavier NX深度学习环境配置

文章目录 jetson xavier NX深度学习环境配置1. SD卡系统烧录1.1 材料1.2 软件配置1.3 格式化SD卡1.4 系统镜像烧录 2. 环境配置2.1 cuda环境配置2.2 安装依赖库2.3 安装python及依赖环境2.4 安装pytorch环境 jetson xavier NX深度学习环境配置 1. SD卡系统烧录 1.1 材料 SD …

python opencv 放射变换和图像缩放-实现图像平移旋转缩放

python opencv 放射变换和图像缩放-实现图像平移旋转缩放 我们实现这次实验主要用到cv2.resize和cv2.warpAffine cv2.warpAffine主要是传入一个图像矩阵&#xff0c;一个M矩阵&#xff0c;输出一个dst结果矩阵&#xff0c;计算公式如下&#xff1a; cv2.resize则主要使用fx&…

CCF CSP认证 历年题目自练Day49

题目一 此题用暴力枚举做过&#xff08;80分&#xff09;现如今终于用二维前缀和做到满分。 试题编号&#xff1a; 202309-2 试题名称&#xff1a; 坐标变换&#xff08;其二&#xff09; 时间限制&#xff1a; 2.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 问题…

内建组件和模块

讨论 Vue.js 中几个非常重要的内建组件和模块&#xff0c;例如 KeepAlive 组件、Teleport 组件、Transition 组件等&#xff0c;它们都需要渲染器级别的底层支持。另外&#xff0c;这些内建组件所带来的能力&#xff0c;对开发者而言非常重要且实用&#xff0c;理解它们的工作原…

计算机基础知识57

前后端数据传输的编码格式(contentType) # 我们只研究post请求方式的编码格式&#xff1a; get请求方式没有编码格式-- index?useranme&password get请求方式没有请求体&#xff0c;参数直接在url地址的后面拼接着 # 有哪些方式可以提交post请求&#xff1a;f…

《山水间的家》第二季收官,国台酒业解锁中国式浪漫

执笔 | 洪大大 编辑 | 萧 萧 近日&#xff0c;由国台酒特别支持的大型文旅探访节目《山水间的家》第二季在总台央视综合频道&#xff08;CCTV-1&#xff09;正式收官。 第二季节目以家庭为视角切入&#xff0c;先后走进江苏、四川、重庆、江西、湖北、贵州、浙江等地24个特色…

java项目之网络在线考试系统(ssm)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于ssm的网络在线考试系统。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; &#xff08;1&#xff09;登录…

关于禅道的安装配置以及项目管理、团队协同工作

目录 一、禅道是什么&#xff1f; 二、特点和功能 三、安装禅道 3.1 下载官网 3.2 版本考虑 3.3 禅道使用手册参考 3.4 Windows端安装禅道 四、启动禅道 4.1 访问禅道 四、禅道部分功能的使用 4.1 添加项目集 4.2 启动/关闭项目 4.3 项目计划仪表盘/阶段目标/研发…

java.util.HashMap cannot be castto org.imeta.orm.base.BizObject

if(null !biz.get("headParallel")){biz.set("headParallel", new BizObject(biz.get("headParallel"))); } 类型给他转换一下930bip旗舰版