累加器 - 分布式共享写变量

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 概念
      • 注意:
      • 应用

概念

  因为RDD是可分区的,每个分区在不同的节点上运行,如果想要对某个值进行全局累加,就需要将每个task中的值取到然后进行累加,而各个Executor之间是不能相互读取对方数据的,所以就没办法在task里面进行最终累加结果的输出,所以就需要一个全局统一的变量来处理。

用下面的代码举例:

@Test
def test(): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")val sc = new SparkContext(conf)var sum = 0val rdd = sc.parallelize(List(10,20,30,40,60))rdd.foreach(x=>{sum+=x})println(sum)
}

以上代码的输出结果是 0
分析:因为函数体外的代码是在Driver端执行的,函数体内的代码是在task里面执行的,而上述代码中创建sum变量是在Driver端创建的,函数体内的代码sum+=x是在task里面执行的,task里面对sum进行了累加,然后在Driver端打印sum的值,打印出来的还是Driver端sum的初始值0


  以下代码是用累加器实现的相同功能
@Test
def test(): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[4]")val sc = new SparkContext(conf)// 创建累加器从sparkContext中val acc = sc.longAccumulator("sumAcc")val rdd = sc.parallelize(List(10,20,30,40,60))//    rdd.foreach(x=>{//      sum+=x//    })// 将要累加的变量放到累加器中rdd.foreach(x=>{acc.add(x)})// 打印累加器的值println(acc.value)}

上面代码的输出结果是160
分析:上述代码中从SparkContext中创建了longAccumulator累加器,起名为sumAcc,然后在task中执行累加的时候调用改累加器的add()方法将要累加的变量加入到累加器中,最后在driver端调用累加器的value方法取出累加器的值,所以就会得出160

总结
累加器的创建:val acc = sc.longAccumulator(“sumAcc”)
向累加器中添加数据:acc.add(x)
取出累加器中的数据:acc.value

注意:

  • Executor端不要获取累加器的值,那样取到的那个值不是累加器最终的累加结果,因为累加器是一个分布式共享的写变量
  • 使用累加器时(也就是向累加器中添加要累加的变量的时候)要将其放在行动算子中。因为在行动算子中这个累加器的值可以保证绝对可靠,如果在转换算子中使用累加器,假如这个spark应用程序有多个job,每个job执行的时候都会执行一遍转换算子,那么这个累加器就会被累加多次,这个值也就不准确了。所以累加器要在行动算子中使用。

应用

  累加器在某些场景下可以避免shuffle。spark中自带的累加器有三个longAccumulator()doubleAccumulator()collectionAccumulator[](),比较常用的是collectionAccumulator[]()

我们用累加器实现WordCount
下面是正常的WordCount代码:

  def main(args: Array[String]): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")val sc = new SparkContext(conf)val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_, 1))rdd3.reduceByKey(_ + _).collect().foreach(println)Thread.sleep(100000000)}

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述

下面用collectionAccumulator[]()累加器实现,用累加器替换掉reduceByKey

def main(args: Array[String]): Unit = {import org.apache.spark.{SparkConf, SparkContext}val conf: SparkConf = new SparkConf().setAppName("SparkCoreStudy").setMaster("local[*]")val sc = new SparkContext(conf)//    定义集合累加器,并声明累加器中存放的元素类型    可变的map,可变就是可以直接在原来的集合上修改不会返回新的集合val acc = sc.collectionAccumulator[mutable.Map[String,Int]]("accNum")val rdd1 = sc.textFile("datas/wc.txt")val rdd2 = rdd1.flatMap(_.split(" "))val rdd3 = rdd2.map((_, 1))//    rdd3.reduceByKey(_ + _).collect().foreach(println)// 遍历每个二元元组rdd3.foreachPartition(it=>{// 定义一个map用来存放一个分区的累加结果val resMap =  mutable.Map[String,Int]()// 遍历分区的每个元素(hadoop,1)it.foreach(y=>{// 从resMap中取这个元素的key看有没有,没有的话返回0val num = resMap.getOrElse(y._1, 0)// 将取到的数和元素的标记1累加val num2 = num+y._2// 写回到map中resMap.put(y._1,num2)})// 将每个分区的累加结果添加到累加器中acc.add(resMap)})//    println(rdd4.collect())// 在driver端取到累加器的结果,这个结果是Java类型的listval res = acc.value// 添加 scala.collection.JavaConverters._ 用里面的asScala将Java类型的list转为Scala类型import scala.collection.JavaConverters._val scalaList = res.asScala//    将list里面的map压掉,剩下()元组val flatten = scalaList.flatten//  以元组的key分组val grouped = flatten.groupBy(_._1)// 得出每个分组内的次数和val res_end = grouped.map(x => {// 将元组中的第二个次数来一次map然后sum,取出每个单词的频率val sum = x._2.map(y => y._2).sum(x._1, sum)})res_end.foreach(println)Thread.sleep(100000000)
}

结果:
在这里插入图片描述查看web页面:
在这里插入图片描述
通过对比两次的web页面可以发现,使用reduceByKey会有一次shuffle,使用累加器替换掉reduceByKey实现相同的功能,没有产生shuffle,因此累加器在某些聚合场景下可以避免掉shuffle从而在一定程度上提高性能

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/468130.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C语言进阶】深度剖析数据在内存中的存储--上

1. C语言中的数据类型的简单介绍 注:C99标准里面,定义了bool类型变量。这时,只要引入头文件stdbool.h ,就能在C语言里面正常使用bool类型。 1.1 在C语言中各类型所占内存空间的大小如下 char类型的数据类型大小为1字节即8比特位。…

[NSSCTF]-Web:[SWPUCTF 2021 新生赛]easyrce解析

先看网页 代码审计: error_reporting(0); :关闭报错,代码的错误将不会显示 highlight_file(__FILE__); :将当前文件的源代码显示出来 eval($_GET[url]); :将url的值作为php代码执行 解题: 题目既然允许…

ChatGPT高效提问—prompt实践(视频制作)

ChatGPT高效提问—prompt实践(视频制作) 1.1 视频制作 ​ 制作视频对于什么都不懂的小白来说非常难。而随着AI技术的发展,这件事变得越来越简单,如今小白也可以轻松上手。如何借助ChatGPT来制作短视频。 ​ 其实方法非常简单&a…

SolidWorks学习笔记——入门知识1

目录 1、固定最近文档 2、根据需要自定义菜单栏 3、根据需要增添选项卡 4、命令搜索框 5、鼠标右键长按快速切换视图 6、鼠标笔势 自定义鼠标笔势 1、固定最近文档 图1 固定最近文档 2、根据需要自定义菜单栏 图2 根据需要自定义菜单栏 3、根据需要增添选项卡 图3 根据…

javaspringbootMySQL高考志愿选择系统68335-计算机毕业设计项目选题推荐(附源码)

目 录 摘要 第1章 绪论 1.1 研究背景与意义 1.2 研究现状 1.3论文结构与章节安排 第2章 相关技术 2.1开发技术 2.2 Java简介 2.3 MVVM模式 2.4 B/S结构 2.5 MySQL数据库 2.6 SpringBoot框架介绍 第3章 系统分析 3.1 可行性分析 3.2 系统流程分析 3.2.1 数…

【开源】SpringBoot框架开发企业项目合同信息系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 合同审批模块2.3 合同签订模块2.4 合同预警模块2.5 数据可视化模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 合同审批表3.2.2 合同签订表3.2.3 合同预警表 四、系统展示五、核心代码5.1 查询合同…

Java并发基础:DelayQueue全面解析!

内容概要 DelayQueue类专为处理延迟任务设计,它允许开发者将任务与指定的延迟时间关联,并在任务到期时自动处理,从而避免了不必要的轮询和资源浪费,此外,DelayQueue内部基于优先队列实现,确保最先到期的任…

2024年幻兽帕鲁服务器搭建方法_图文保姆级教程

幻兽帕鲁官方服务器不稳定?自己搭建幻兽帕鲁服务器,低延迟、稳定不卡,目前阿里云和腾讯云均推出幻兽帕鲁专用服务器,腾讯云直接提供幻兽帕鲁镜像系统,阿里云通过计算巢服务,均可以一键部署,鼠标…

【Jmeter】JDK及Jmeter的安装部署及简单配置

JDK的安装和环境变量配置 对于Linux、Mac和Windows系统,JDK的安装和环境变量配置方法略有不同。以下是针对这三种系统的详细步骤: 对于Linux系统: 下载适合Linux系统的JDK安装包,可以选择32位或64位的版本。 将JDK的安装包放置…

社区经营的好处与优势:为何越来越多的人选择社区店?

社区店,这个曾经被视为小型、局限的商业模式,如今正逐渐崭露头角,成为众多创业者和消费者的首选。 特别是在鲜奶吧这样的细分市场中,社区店更是展现出了其独特的魅力和优势。作为一名拥有五年鲜奶吧经营经验的创业者,…

【设计模式】springboot3项目整合模板方法深入理解设计模式之模板方法(Template Method)

🎉🎉欢迎光临🎉🎉 🏅我是苏泽,一位对技术充满热情的探索者和分享者。🚀🚀 🌟特别推荐给大家我的最新专栏《Spring 狂野之旅:底层原理高级进阶》 &#x1f680…

【国产MCU】-CH32V307-基本定时器(BCTM)

基本定时器(BCTM) 文章目录 基本定时器(BCTM)1、基本定时器(BCTM)介绍2、基本定时器驱动API介绍3、基本定时器使用实例CH32V307的基本定时器模块包含一个16 位可自动重装的定时器(TIM6和TIM7),用于计数和在更新新事件产生中断或DMA 请求。 本文将详细介绍如何使用CH32…