Spark10-11

10. 广播变量

10.1 广播变量的使用场景

在很多计算场景,经常会遇到两个RDD进行JOIN,如果一个RDD对应的数据比较大,一个RDD对应的数据比较小,如果使用JOIN,那么会shuffle,导致效率变低。广播变量就是将相对较小的数据,先收集到Driver,然后再通过网络广播到属于该Application对应的每个Executor中,以后处理大量数据对应的RDD关联数据,就不用shuffle了,而是直接在内存中关联已经广播好的数据,即通实现mapside join,可以将Driver端的数据广播到属于该application的Executor,然后通过Driver广播变量返回的引用,获取实现广播到Executor的数据

广播变量的特点:广播出去的数据就无法在改变了,在没有Executor中是只读的操作,在每个Executor中,多个Task使用一份广播变量

 

10.2 广播变量的实现原理

广播变量是通过BT的方式广播的(TorrentBroadcast),多个Executor可以相互传递数据,可以提高效率

sc.broadcast这个方法是阻塞的(同步的)

广播变量一但广播出去就不能改变,为了以后可以定期的改变要关联的数据,可以定义一个object[单例对象],在函数内使用,并且加一个定时器,然后定期更新数据

广播到Executor的数据,可以在Driver获取到引用,然后这个引用会伴随着每一个Task发送到Executor,然后通过这个引用,获取到事先广播好的数据

10.3 案例:根据IP计算归属地

10.3.1 需求

根据IP规则数据,计算出给定日志中ip地址对应的省份信息,由于IP地址的规则数据相对较小,所以可以将IP规则数据先广播出去,以后关联IP规则数据,就可以在内存中进行关联了,这样可以避免shuffle,提高执行效率!

10.3.2 代码实现

11. 序列化问题

11.1 序列化问题的场景

spark任务在执行过程中,由于编写的程序不当,任务在执行时,会出序列化问题,通常有以下两种情况,

  • 封装数据的Bean没有实现序列化接口(Task已经生成了),在ShuffleWirte之前要将数据溢写磁盘,会抛出异常
  • 函数闭包问题,即函数的内部,使用到了外部没有实现序列化的引用(Task没有生成)

11.2 数据Bean未实现序列化接口

spark在运算过程中,由于很多场景必须要shuffle,即向数据溢写磁盘并且在网络间进行传输,但是由于封装数据的Bean没有实现序列化接口,就会导致出现序列化的错误!

Scala

object C02_CustomSort {

  def main(args: Array[String]): Unit = {

    val sc = SparkUtil.getContext(this.getClass.getSimpleName, true)
    //使用并行化的方式创建RDD
    val lines = sc.parallelize(
      List(
        "laoduan,38,99.99",
        "nianhang,33,99.99",
        "laozhao,18,9999.99"
      )
    )
    val tfBoy: RDD[Boy] = lines.map(line => {
      val fields = line.split(",")
      val name = fields(0)
      val age = fields(1).toInt
      val fv = fields(2).toDouble
      new Boy(name, age, fv) //将数据封装到一个普通的class中
    })

    implicit val ord = new Ordering[Boy] {
      override def compare(x: Boy, y: Boy): Int = {
        if (x.fv == y.fv) {
          x.age - y.age
        } else {
          java.lang.Double.compare(y.fv, x.fv)
        }
      }
    }
    //sortBy会产生shuffle,如果Boy没有实现序列化接口,Shuffle时会报错
    val sorted: RDD[Boy] = tfBoy.sortBy(bean => bean)

    val res = sorted.collect()

    println(res.toBuffer)
  }
}

//如果以后定义bean,建议使用case class
class Boy(val name: String, var age: Int, var fv: Double)  //extends Serializable
{
  
  override def toString = s"Boy($name, $age, $fv)"
}

11.3 函数闭包问题

11.3.1 闭包的现象

在调用RDD的Transformation和Action时,可能会传入自定义的函数,如果函数内部使用到了外部未被序列化的引用,就会报Task无法序列化的错误。原因是spark的Task是在Driver端生成的,并且需要通过网络传输到Executor中,Task本身实现了序列化接口,函数也实现了序列化接口,但是函数内部使用到的外部引用不支持序列化,就会函数导致无法序列化,从而导致Task没法序列化,就无法发送到Executor中了

 

在调用RDD的Transformation或Action是传入函数,第一步就进行检测,即调用sc的clean方法

为了避免错误,在Driver初始化的object或class必须实现序列化接口,不然会报错误

Scala
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f) //检测函数是否可以序列化,如果可以直接将函数返回,如果不可以,抛出异常
  new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

Scala
private def ensureSerializable(func: AnyRef): Unit = {
  try {
    if (SparkEnv.get != null) {
      //获取spark执行换的的序列化器,如果函数无法序列化,直接抛出异常,程序退出,根本就没有生成Task
      SparkEnv.get.closureSerializer.newInstance().serialize(func)
    }
  } catch {
    case ex: Exception => throw new SparkException("Task not serializable", ex)
  }
}

11.3.2 在Driver端初始化实现序列化的object

在一个Executor中,多个Task使用同一个object对象,因为在scala中,object就是单例对象,一个Executor中只有一个实例,Task会反序列化多次,但是引用的单例对象只反序列化一次

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleObjectSer是一个静态对象,实在第一次使用的时候被初始化了(实在Driver被初始化的)
val rulesObj = RuleObjectSer

//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesObj.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesObj.toString)
}

//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))

 

11.3.3 在Driver端初始化实现序列化的class

在一个Executor中,每个Task都会使用自己独享的class实例,因为在scala中,class就是多例,Task会反序列化多次,每个Task引用的class实例也会被序列化

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//函数外部定义的一个引用类型(变量)
//RuleClassNotSer是一个类,需要new才能实现(实在Driver被初始化的)
val rulesClass = new RuleClassSer

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  val name = rulesClass.rulesMap.getOrElse(code, "未知") //闭包
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

 

11.3.4 在函数内部初始化未序列化的object

object没有实现序列化接口,不会出现问题,因为该object实现函数内部被初始化的,而不是在Driver初始化的

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//不再Driver端初始化RuleObjectSer或RuleClassSer
//函数实在Driver定义的
val func = (line: String) => {
  val fields = line.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //在函数内部初始化没有实现序列化接口的RuleObjectNotSer
  val name = RuleObjectNotSer.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, RuleObjectNotSer.toString)
}
//处理数据,关联维度
val res = lines.map(func)
res.saveAsTextFile(args(2))
sc.stop()

 

11.3.5 在函数内部初始化未序列化的class

这种方式非常不好,因为每来一条数据,new一个class的实例,会导致消耗更多资源,jvm会频繁GC

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))

//处理数据,关联维度
val res = lines.map(e => {
  val fields = e.split(",")
  val id = fields(0).toInt
  val code = fields(1)
  //RuleClassNotSer是在Executor中被初始化的
  val rulesClass = new RuleClassNotSer
  //但是如果每来一条数据new一个RuleClassNotSer,不好,效率低,浪费资源,频繁GC
  val name = rulesClass.rulesMap.getOrElse(code, "未知")
  //获取当前线程ID
  val treadId = Thread.currentThread().getId
  //获取当前Task对应的分区编号
  val partitiondId = TaskContext.getPartitionId()
  //获取当前Task运行时的所在机器的主机名
  val host = InetAddress.getLocalHost.getHostName
  (id, code, name, treadId, partitiondId, host, rulesClass.toString)
})

res.saveAsTextFile(args(2))

11.3.6 调用mapPartitions在函数内部初始化未序列化的class

一个分区使用一个class的实例,即每个Task都是自己的class实例

Scala
//从HDFS中读取数据,创建RDD
//HDFS指定的目录中有4个小文件,内容如下:
//1,ln
val lines = sc.textFile(args(1))
//处理数据,关联维度
val res = lines.mapPartitions(it => {
  //RuleClassNotSer是在Executor中被初始化的
  //一个分区的多条数据,使用同一个RuleClassNotSer实例
  val rulesClass = new RuleClassNotSer
  it.map(e => {
    val fields = e.split(",")
    val id = fields(0).toInt
    val code = fields(1)
    val name = rulesClass.rulesMap.getOrElse(code, "未知")
    //获取当前线程ID
    val treadId = Thread.currentThread().getId
    //获取当前Task对应的分区编号
    val partitiondId = TaskContext.getPartitionId()
    //获取当前Task运行时的所在机器的主机名
    val host = InetAddress.getLocalHost.getHostName
    (id, code, name, treadId, partitiondId, host, rulesClass.toString)
  })
})
res.saveAsTextFile(args(2))
sc.stop()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/1511.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于SpringBoot+Vue+微信小程序的电影平台

✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取项目下载方式🍅 一、项目背景介绍: 研究背景:…

A. Portal(dp优化枚举)

Problem - 1580A - Codeforces CQXYM发现了一个大小为nm的矩形。矩形由n行m列的方块组成,每个方块可以是黑曜石方块或空方块。CQXYM可以通过一次操作将黑曜石方块变为空方块,或将空方块变为黑曜石方块。 一个大小为ab的矩形M被称为传送门,当…

【adb指令】

一、什么是adb adb的全称为Android Debug Bridge,官方提供的用于操作安卓设备的工具。 二、adb用来干什么? 在电脑终端通过命令行: 打开收手机应用;传输文件;点击、输入、滑动等;硬件操作、返回、回到首…

SpringBoot04:JSR303数据校验及多环境切换

目录 一、JSR303数据校验 1、如何使用? 2、常见参数 二、多环境切换 1、多配置文件 2、yaml的多文档块 3、配置文件加载位置 一、JSR303数据校验 1、如何使用? SpringBoot中可以用Validated来校验数据,如果数据异常则会统一抛出异常…

php中的双引号与单引号的基本使用

字符串,在各类编程语言中都是一个非常重要的数据类型 网页当中的图片,文字,特殊符号,HTMl标签,英文等都属于字符串 PHP字符串变量用于存储并处理文本, 在创建字符串之后,我们就可以对它进行操作。我们可以直接在函数中使用字符串,或者把它存储在变量中 字…

第9章 异常处理

第9章 异常处理 9.1 Java异常处理 try-catch-finally ​ ​ 9.2 Scala异常处理 ​ ​ package chapter09object Test01_Exception {def main(args: Array[String]): Unit {try {val n 10 / 1} catch {case e: ArithmeticException > {println("发生算数异常&quo…

【MySQL数据库 | 第二十篇】explain执行计划

目录 前言: explain: 语法: 总结: 前言: 上一篇我们介绍了从时间角度分析MySQL语句执行效率的三大工具:SQL执行频率,慢日志查询,profile。但是这三个方法也只是在时间角度粗略的…

简要记录java 锁

Java中的锁机制主要分为Lock和Synchronized. Synchronized在JVM里的实现是基于进入和退出Monitor对象来实现方法同步和代码块同步的。monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处,JVM要保证每个mon…

java循环结构

Java中有三种主要的循环结构: while 循环do…while 循环for 循环 在Java5中引入了一种主要用于数组的增强型for循环。 1.while循环 语法: while(布尔表达式){ // 循环内容 } 复制 实例: public class Demo{ public static void main(S…

【数据库】MySQL 高级(进阶) SQL 语句

文章目录 前提条件一、常用查询1. SELECT(显示查询)2. DISTINCT(不重复查询)3. WHERE(有条件查询)4. AND/OR(且/或)5. IN (显示已知值的字段)6. BETWEEN&…

适合嵌入式开发的GUI(嵌入式学习)

嵌入式开发的GUI如何选择? 常见的嵌入式GUI开发方法轻量级GUI库优缺点 基于Web技术优缺点 Qt框架优缺点 原生开发优缺点 嵌入式系统的限制 常见的嵌入式GUI开发方法 嵌入式开发中的GUI(图形用户界面)是指在嵌入式系统中实现图形化的用户界面…

深度学习05-RNN循环神经网络

概述 循环神经网络(Recurrent Neural Network,RNN)是一种具有循环连接的神经网络结构,被广泛应用于自然语言处理、语音识别、时序数据分析等任务中。相较于传统神经网络,RNN的主要特点在于它可以处理序列数据&#xf…