Spark-06:共享变量

目录

1.广播变量(broadcast variables)

2.累加器(accumulators)


      在分布式计算中,当在集群的多个节点上并行运行函数时,默认情况下,每个任务都会获得函数中使用到的变量的一个副本。如果变量很大,这会导致网络传输占用大量带宽,并且在每个节点上都占用大量内存空间。为了解决这个问题,Spark引入了共享变量的概念。

        共享变量允许在多个任务之间共享数据,而不是为每个任务分别复制一份变量。这样可以显著降低网络传输的开销和内存占用。Spark提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators)。

1.广播变量(broadcast variables)

        通常情况下,Spark程序运行时,通常会将数据以副本的形式分发到每个执行器(Executor)的任务(Task)中,但当变量较大时,这会导致大量的内存和网络开销。通过使用广播变量,Spark将变量只发送一次到每个节点,并在多个任务之间共享这个副本,从而显著降低了内存占用和网络传输的开销。

Scala 实现:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

Java 实现:

Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});broadcastVar.value();
// returns [1, 2, 3]

2.累加器(accumulators)

        累加器是Spark中的一种特殊类型的共享变量,主要用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。累加器支持的数据类型仅限于数值类型,包括整数和浮点数等。

Scala 实现:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 sscala> accum.value
res2: Long = 10

Java 实现:

LongAccumulator accum = jsc.sc().longAccumulator();sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 saccum.value();
// returns 10

        内置累加器功能有限,但可以通过继承AccumulatorV2来创建自己的类型。AccumulatorV2抽象类有几个方法必须重写:reset用于将累加器重置为零,add用于向累加器中添加另一个值,merge用于将另一个相同类型的累加器合并到此累加器。

自定义累加器Scala实现:

package com.yichenkeji.demo.sparkscalaimport org.apache.spark.util.AccumulatorV2class CustomAccumulator extends AccumulatorV2[Int, Int]{//初始化累加器的值private var sum = 0override def isZero: Boolean = sum == 0override def copy(): AccumulatorV2[Int, Int] = {val newAcc = new CustomAccumulator()newAcc.sum = sumnewAcc}override def reset(): Unit = sum = 0override def add(v: Int): Unit = sum += voverride def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.valueoverride def value: Int = sum
}

自定义累加器Java实现:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.util.AccumulatorV2;public class CustomAccumulator extends AccumulatorV2<Integer, Integer> {// 初始化累加器的值private Integer sum = 0;@Overridepublic boolean isZero() {return sum == 0;}@Overridepublic AccumulatorV2<Integer, Integer> copy() {CustomAccumulator customAccumulator = new CustomAccumulator();customAccumulator.sum = this.sum;return customAccumulator;}@Overridepublic void reset() {this.sum = 0;}@Overridepublic void add(Integer v) {this.sum += v;}@Overridepublic void merge(AccumulatorV2<Integer, Integer> other) {this.sum += ((CustomAccumulator) other).sum;}@Overridepublic Integer value() {return sum;}
}

自定义累加器的使用:

package com.yichenkeji.demo.sparkjava;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;import java.util.Arrays;
import java.util.List;public class AccumulatorTest {public static void main(String[] args) {//1.初始化SparkContext对象SparkConf sparkConf = new SparkConf().setAppName("Spark Java").setMaster("local[*]");JavaSparkContext sc = new JavaSparkContext(sparkConf);CustomAccumulator customAccumulator = new CustomAccumulator();//注册自定义累加器才能使用sc.sc().register(customAccumulator);sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).foreach(x -> customAccumulator.add(x));System.out.println(customAccumulator.value());//5.停止SparkContextsc.stop();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/213197.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

123. 股票买卖的最佳时机III(2次交易)

题目 题解 class Solution:def maxProfit(self, prices: List[int]) -> int:N len(prices)# 状态定义 dp[i][j][k]代表在第i天&#xff0c;被允许完成j次交易时&#xff0c;持有或者不持有的最大利润。k0代表不持有&#xff0c;k1代表持有dp [[[0 for k in range(2)] for…

分布式锁之基于mysql实现分布式锁(四)

不管是jvm锁还是mysql锁&#xff0c;为了保证线程的并发安全&#xff0c;都提供了悲观独占排他锁。所以独占排他也是分布式锁的基本要求。 可以利用唯一键索引不能重复插入的特点实现。设计表如下&#xff1a; CREATE TABLE tb_lock (id bigint(20) NOT NULL AUTO_INCREMENT,…

数据防泄漏系统有什么作用及优势?

数据防泄漏系统(Data Loss Prevention&#xff0c;简称DLP)是一种重要的信息安全解决方案&#xff0c;旨在防止敏感数据被未经授权地泄露、滥用或盗窃。它在保护企业、政府机构和个人的隐私和机密信息方面发挥着关键作用。以下是数据防泄漏系统的作用及优势&#xff1a; 作用&a…

STM32:基本定时器原理和定时程序

一、初识定时器TIM 定时器就是计数器&#xff0c;定时器的作用就是设置一个时间&#xff0c;然后时间到后就会通过中断等方式通知STM32执行某些程序。定时器除了可以实现普通的定时功能&#xff0c;还可以实现捕获脉冲宽度&#xff0c;计算PWM占空比&#xff0c;输出PWM波形&am…

【开源】基于Vue.js的固始鹅块销售系统

项目编号&#xff1a; S 060 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S060&#xff0c;文末获取源码。} 项目编号&#xff1a;S060&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 鹅块类型模块2.3 固…

YOLOv5 分类模型 预处理 OpenCV实现

YOLOv5 分类模型 预处理 OpenCV实现 flyfish YOLOv5 分类模型 预处理 PIL 实现 YOLOv5 分类模型 OpenCV和PIL两者实现预处理的差异 YOLOv5 分类模型 数据集加载 1 样本处理 YOLOv5 分类模型 数据集加载 2 切片处理 YOLOv5 分类模型 数据集加载 3 自定义类别 YOLOv5 分类模型…

vue3-组件传参及计算属性

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容:vue3-组件传参及计算属性 目录 vue3中的组件传参 1、父传子 2、子传父 toRef 与 toRefs vue3中…

杭州银行连接解决方案:集成CRM、用户运营和广告推广系统

自动化与智能化是企业新的增长引擎。在数字化时代&#xff0c;企业需要通过数字化工具来提高效率和效益&#xff0c;这也是杭州银行推出的连接解决方案的初衷。该解决方案集成了CRM、用户运营和广告推广系统&#xff0c;为企业提供全方位的数字化转型支持。 杭州银行连接解决方…

多功能回馈式交流电子负载的应用

多功能回馈式交流电子负载是用于模拟和测试电源、电池等电子设备的负载工具。它具有多种应用&#xff0c;可以用于测试和评估各种类型的电源&#xff0c;包括直流电源和交流电源。它可以模拟各种负载条件&#xff0c;如恒定电流、恒定电压和恒定功率&#xff0c;以验证电源的性…

WPF实战项目十六(客户端):备忘录接口

1、新增IMemoService接口&#xff0c;继承IBaseService接口 public interface IMemoService : IBaseService<MemoDto>{} 2、新增MemoService类&#xff0c;继承BaseService和IMemoService接口 public class MemoService : BaseService<MemoDto>, IMemoService{pub…

PC分页操作以及loading效果

page-size 每页显示条目个数 current-page 当前页数 total 数据总数 current-change【currentPage 改变时会触发】 切换分页时会先加载&#xff0c;等在接口数据&#xff0c;接口返回&#xff0c;加载会关闭&#xff08;在获取接口数据完毕哪里加上this.loadingfalse&#xff0…

掘金产业数字化,百望云荣登2023中国产业数字化新锐势力榜

日前&#xff0c;“第十届中国产业数字化大会”在南京盛大举行&#xff0c;百望云荣登“中国产业数字化新锐势力榜”&#xff0c;市场价值与方案能力再获认可&#xff01; 据悉共有50家企业登上榜单&#xff0c;本次会议作为“中国&#xff08;南京&#xff09;电子商务大会”的…