Spark---累加器和广播变量

文章目录

  • 1.累加器实现原理
  • 2.自定义累加器
  • 3.广播变量

1.累加器实现原理

累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量,在Executor 端的每个 Task 都会得到这个变量的一份新的副本,每个 task 更新这些副本的值后,传回 Driver 端进行 merge。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4),2)var sum=0dataRdd.foreach(num=>sum+=num)println(sum)context.stop()

运行结果:
在这里插入图片描述
我们预期是想要实现数据的累加,开始数据从Driver被传输到了Executor中进行计算,但是每个分区在累加数据完成之后并没有将计算结果返回到Driver端,所以导致最后的结果与预期的不一致。
在这里插入图片描述
对上述代码使用累加器

    val dataRdd: RDD[Int] = context.makeRDD(List(1, 2, 3, 4))val sum = context.longAccumulator("sum")dataRdd.foreach(num=>{//使用累加器sum.add(num)})//获取累加器的值println(sum.value)

运行结果:
在这里插入图片描述
由此可见,在使用了累加器之后,每个Executor在开始都会获得这个累加器变量,每个Executor在执行完成后,累加器会将每个Executor中累加器变量的值聚合到Driver端。
在这里插入图片描述

Spark提供了多种类型的累加器,以下是其中的一些:
在这里插入图片描述

2.自定义累加器

用户可以通过继承AccumulatorV2来自定义累加器。需求:自定义累加器实现WordCount案例。

AccumulatorV2[IN,OUT]中:
IN:输入数据的类型
OUT:输出数据类型

在这里插入图片描述
WordCount案例实现完整代码:

package bigdata.wordcount.leijiaqiimport bigdata.wordcount.leijiaqi
import org.apache.spark.rdd.RDD
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable/*** 使用累加器完成WordCount案例*/
object Spark_addDemo {def main(args: Array[String]): Unit = {//建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val dataRDD: RDD[String] = context.textFile("D:\\learnSoftWare\\IdeaProject\\Spark_Demo\\Spark_Core\\src\\main\\com.mao\\datas\\1.txt")//创建累加器对象val wordCountAccumulator = new WordCountAccumulator//向Spark中进行注册context.register(wordCountAccumulator,"wordCountAccumulator")//实现累加dataRDD.foreach(word => {wordCountAccumulator.add(word)})//获取累加结果,打印在控制台上println(wordCountAccumulator.value)//关闭链接context.stop()}}class WordCountAccumulator extends  AccumulatorV2[String,mutable.Map[String,Long]]
{//定义一个map用于存储累加后的结果var map: mutable.Map[String, Long] =mutable.Map[String,Long]()//累加器是否为初始状态override def isZero: Boolean = {map.isEmpty}//复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Long]] = {new WordCountAccumulator()}//重置累加器override def reset(): Unit = {map.clear()}//向累加器添加数据INoverride def add(word: String): Unit = {// 查询 map 中是否存在相同的单词// 如果有相同的单词,那么单词的数量加 1// 如果没有相同的单词,那么在 map 中增加这个单词val newValue = map.getOrElse(word, 0L) + 1map.update(word,newValue)}//合并累加器override def merge(other: AccumulatorV2[String, mutable.Map[String, Long]]): Unit = {var map1=this.mapvar map2=other.value//合并两个mapmap2.foreach({case (word,count)=>{val newValue = map1.getOrElse(word,0L)+countmap1.update(word,newValue)}})}//返回累加器的结果(OUT)override def value: mutable.Map[String, Long] = this.map
}
}

运行结果:
在这里插入图片描述

3.广播变量

在Apache Spark中,广播变量(Broadcast Variables)是一种特殊类型的变量,用于优化数据共享。当Spark应用程序在集群中的多个节点上运行时,每个节点都需要访问相同的数据集。如果数据集很大,那么将这些数据发送到每个节点可能会非常耗时和低效。 广播变量提供了一种方法,可以在集群中的所有节点上共享数据集的一个只读副本 ,从而避免了在每个节点上重复发送数据。

在这里插入图片描述
广播变量也叫分布式只读变量,它可以将闭包数据发送到每个Executor的内存中来达到共享的目的,Executor其实就相当于一个JVM,在启动的时候会自动分配内存。

    //建立与Spark框架的连接val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") //配置文件val context = new SparkContext(wordCount) //读取配置文件val rdd1 = context.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 4)val list = List(("a", 4), ("b", 5), ("c", 6), ("d", 7))// 声明广播变量val broadcast: Broadcast[List[(String, Int)]] = context.broadcast(list)val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {case (key, num) => {var num2 = 0// 使用广播变量for ((k, v) <- broadcast.value) {if (k == key) {num2 = v}}(key, (num, num2))}}resultRDD.collect().foreach(print)context.stop()

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/409823.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【windows】右键添加git bash here菜单

在vs 里安装了git for windows 后&#xff0c;之前git-bash 右键菜单消失了。难道是git for windows 覆盖了原来自己安装的git &#xff1f;大神给出解决方案 手动添加Git Bash Here到右键菜单&#xff08;超详细&#xff09; 安装路径&#xff1a;我老的 &#xff1f; vs的gi…

Spring5深入浅出篇:Spring工厂设计模式拓展应用

Spring5深入浅出篇:Spring工厂设计模式拓展应用 简单工厂实现 这里直接上代码举例子 UserService.java public interface UserService {public void register(User user);public void login(String name, String password); }UserServiceImpl.java public class UserService…

Netty-Netty源码分析

Netty线程模型图 Netty线程模型源码剖析图 Netty高并发高性能架构设计精髓 主从Reactor线程模型NIO多路复用非阻塞无锁串行化设计思想支持高性能序列化协议零拷贝(直接内存的使用)ByteBuf内存池设计灵活的TCP参数配置能力并发优化 无锁串行化设计思想 在大多数场景下&#…

【计算机网络】网络层——详解IP协议

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【网络编程】 本专栏旨在分享学习计算机网络的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 &#x1f431;一、I…

【MATLAB源码-第113期】基于matlab的孔雀优化算法(POA)机器人栅格路径规划,输出做短路径图和适应度曲线。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 POA&#xff08;孔雀优化算法&#xff09;是一种基于孔雀羽毛开屏行为启发的优化算法。这种算法模仿孔雀通过展开其色彩斑斓的尾羽来吸引雌性的自然行为。在算法中&#xff0c;每个孔雀代表一个潜在的解决方案&#xff0c;而…

linux驱动(六):input(key)

本文主要探讨210的input子系统。 input子系统 input子系统包含:设备驱动层,输入核心层,事件驱动层 事件处理层&#xff1a;接收核心层上报事件选择对应struct input_handler处理,每个input_handler对象处理一类事件,同类事件的设备驱动共用同一handler …

TCP连接TIME_WAIT

TCP断开过程: TIME_WAIT的作用: TIME_WAIT状态存在的理由&#xff1a; 1&#xff09;可靠地实现TCP全双工连接的终止 在进行关闭连接四次挥手协议时&#xff0c;最后的ACK是由主动关闭端发出的&#xff0c;如果这个最终的ACK丢失&#xff0c;服务器将重发最终的FIN&#xf…

Unity与Android交互通信系列(4)

上篇文章我们实现了模块化调用&#xff0c;运用了模块化设计思想和简化了调用流程&#xff0c;本篇文章讲述UnityPlayerActivity类的继承和使用。 在一些深度交互场合&#xff0c;比如Activity切换、程序启动预处理等&#xff0c;这时可能会需要继承Application和UnityPlayerAc…

【目标检测】YOLOv7算法实现(一):模型搭建

本系列文章记录本人硕士阶段YOLO系列目标检测算法自学及其代码实现的过程。其中算法具体实现借鉴于ultralytics YOLO源码Github&#xff0c;删减了源码中部分内容&#xff0c;满足个人科研需求。   本篇文章在YOLOv5算法实现的基础上&#xff0c;进一步完成YOLOv7算法的实现。…

设置了uni.chooseLocation,小程序中打不开

设置了uni.chooseLocation&#xff0c;在小程序打不开&#xff0c;点击没反应&#xff0c;地图显现不出来&#xff1b; 解决方案&#xff1a; 1.Hbuilder——微信开发者工具路径没有配置 打开工具——>设置 2.微信小程序服务端口没有开 解决方法&#xff1a;打开微信开发…

【ug572】UltraScale体系结构时钟资源手册节选(二)

时钟缓冲区 The PHY global clocking contains several sets of BUFGCTRLs, BUFGCEs, and BUFGCE_DIVs. Each set can be driven by four GC pins from the adjacent bank, MMCMs, PLLs in the same PHY, and interconnect. The clock buffers then drive the routing and di…

❤ Uniapp使用三( 打包和发布上线)

❤ Uniapp使用三( 打包和发布上线) 一、介绍 什么是 uniapp&#xff1f; uniapp 是一种基于 Vue.js 的多平台开发框架&#xff0c;它可以同时用于开发安卓、iOS、H5 等多个平台。因此&#xff0c;只需要写一次代码就可以在多个平台上运行&#xff0c;提高了开发效率。 打包…