spark12-13-14

12. Task线程安全问题

12.1 现象和原理

在一个Executor可以同时运行多个Task,如果多个Task使用同一个共享的单例对象,如果对共享的数据同时进行读写操作,会导致线程不安全的问题,为了避免这个问题,可以加锁,但效率变低了,因为在一个Executor中同一个时间点只能有一个Task使用共享的数据,这样就变成了串行了,效率低!

12.2 案例

定义一个工具类object,格式化日期,因为SimpleDateFormat线程不安全,会出现异常

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD: RDD[Long] = lines.map(e => {
  //将字符串转成long类型时间戳
  //使用自定义的object工具类
  val time: Long = DateUtilObj.parse(e)
  time
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
object DateUtilObj {

  //多个Task使用了一个共享的SimpleDateFormat,SimpleDateFormat是线程不安全

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  //线程安全的
  //val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }

}

上面的程序会出现错误,因为多个Task同时使用一个单例对象格式化日期,报错,如果加锁,程序会变慢,改进后的代码:

Scala
val conf = new SparkConf()
  .setAppName("WordCount")
  .setMaster("local[*]") //本地模式,开多个线程
//1.创建SparkContext
val sc = new SparkContext(conf)

val lines = sc.textFile("data/date.txt")

val timeRDD = lines.mapPartitions(it => {
  //一个Task使用自己单独的DateUtilClass实例,缺点是浪费内存资源
  val dataUtil = new DateUtilClass
  it.map(e => {
    dataUtil.parse(e)
  })
})

val res = timeRDD.collect()
println(res.toBuffer)

Scala
class DateUtilClass {

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(str: String): Long = {
    //2022-05-23 11:39:30
    sdf.parse(str).getTime
  }
}

改进后,一个Task使用一个DateUtilClass实例,不会出现线程安全的问题。

13. 累加器

累加器是Spark中用来做计数功能的,在程序运行过程当中,可以做一些额外的数据指标统计

需求:在处理数据的同时,统计一下指标数据,具体的需求为:将RDD中对应的每个元素乘以10,同时在统计每个分区中偶数的数据

13.1 不使用累加器的方案

需要多次触发Action,效率低,数据会被重复计算

Scala
/**
 * 不使用累加器,而是触发两次Action
 */
object C12_AccumulatorDemo1 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //对数据进行转换操作(将每个元素乘以10),同时还要统计每个分区的偶数的数量
    val rdd2 = rdd1.map(_ * 10)
    //第一次触发Action
    rdd2.saveAsTextFile("out/111")

    //附加的指标统计
    val rdd3 = rdd1.filter(_ % 2 == 0)
    //第二个触发Action
    val c = rdd3.count()
    println(c)
  }
}

13.2 使用累加器的方法

触发一次Action,并且将附带的统计指标计算出来,可以使用Accumulator进行处理,Accumulator的本质数一个实现序列化接口class,每个Task都有自己的累加器,避免累加的数据发送冲突

Scala
object C14_AccumulatorDemo3 {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local[*]") //本地模式,开多个线程
    //1.创建SparkContext
    val sc = new SparkContext(conf)

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 2)
    //在Driver定义一个特殊的变量,即累加器
    //Accumulator可以将每个分区的计数结果,通过网络传输到Driver,然后进行全局求和
    val accumulator: LongAccumulator = sc.longAccumulator("even-acc")
    val rdd2 = rdd1.map(e => {
      if (e % 2 == 0) {
        accumulator.add(1)  //闭包,在Executor中累计的
      }
      e * 10
    })

    //就触发一次Action
    rdd2.saveAsTextFile("out/113")

    //每个Task中累计的数据会返回到Driver吗?
    println(accumulator.count)
  }
}

14. StandAlone的两种执行模式

spark自动的StandAlone集群有两种运行方式,分别是client模式和cluster模式,默认使用的是client模式。两种运行模式的本质区别是,Driver运行在哪里了

14.1 什么是Driver

Driver本意是驱动的意思(类似叫法的有MySQL的连接驱动),在就是与集群中的服务建立连接,执行一些命令和请求的。但是在Spark的Driver指定就是SparkContext和里面创建的一些对象,所有可以总结为,SparkContext在哪里创建,Driver就在哪里。Driver中包含很多的对象实例,有SparkContext,DAGScheduler、TaskScheduler、ShuffleManager、BroadCastManager等,Driver是对这些对象的统称。

14.2 client模式

Driver运行在用来提交任务的SparkSubmit进程中,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode client (默认的)

 

 

注意:spark-shell只能以client模式运行,不能以cluster模式运行,因为提交任务的命令行客户端和SparkContext必须在同一个进程中。

 

14.3 cluster模式

Driver运行在Worker启动的一个进程中,这个进程叫DriverWapper,在Spark的stand alone集群中,提交spark任务时,可以使用cluster模式即--deploy-mode cluster

特点:Driver运行在集群中,不在SparkSubmit进程中,需要将jar包上传到hdfs中

Shell
spark-submit --master spark://node-1.51doit.cn:7077 --class cn._51doit.spark.day01.WordCount --deploy-mode cluster hdfs://node-1.51doit.cn:9000/jars/spark10-1.0-SNAPSHOT.jar hdfs://node-1.51doit.cn:9000/wc hdfs://node-1.51doit.cn:9000/out002

 

cluster模式的特点:可以给Driver更灵活的指定一些参数,可以给Driver指定内存大小,cores的数量

如果一些运算要在Driver进行计算,或者将数据收集到Driver端,这样就必须指定Driver的内存和cores更大一些

Shell
# 指定Driver的内存,默认是1g
--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
# 指定Driver的cores,默认是1
--driver-cores NUM          Number of cores used by the driver, only in cluster mode (Default: 1).

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/2924.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux操作系统学习——启动

概要 Linux操作系统内核是服务端学习的根基,也是提高编程能力、源码阅读能力和进阶知识学习能力的重要部分,本文开始将记录Linux操作系统中的各个部分源码学习历程。 1. 理解代码的组织结构 以Linux源码举例,首先你得知道操作系统分为哪几个部…

Django框架之邮件系统,涉及HTML、富文本、附件邮件发送

参考 (892条消息) Django框架之邮件系统,涉及HTML、富文本、附件邮件发送_django邮件系统_李恩泽的技术博客的博客-CSDN博客https://blog.csdn.net/heroiclee/article/details/121406488 发送设置(settings.py) EMAIL_USE_SSL True # Sec…

Qt关闭主窗口后,退出所有异步线程

目录 1.要知道主窗口什么时候关闭2.关闭异步线程 1.要知道主窗口什么时候关闭 在widget.h新增下面的函数 private slots:void closeEvent(QCloseEvent *event);在widget.cpp新增 void Widget::closeEvent(QCloseEvent *event) {qDebug() << "关闭主窗口了&#x…

机器学习复习6

机器学习复习 1 - 在机器学习的背景下&#xff0c;什么是诊断(diagnostic)&#xff1f; A. 这指的是衡量一个学习算法在测试集(算法没有被训练过的数据)上表现如何的过程 B. 迅速尝试尽可能多的不同方法来改进算法&#xff0c;从而看看什么方法有效 C. 机器学习在医疗领域的应用…

基于Java+Swing+Mysql物流跟踪管理系统

基于JavaSwingMysql物流跟踪管理系统 一、系统介绍二、功能展示1.主页2.新增物流信息3.删除物流信息 三、数据库四、其他系统实现五、获取源码 一、系统介绍 该系统实现了查看物流列表、新增物流信息、删除物流信息 运行环境&#xff1a;eclipse、idea、jdk1.8 二、功能展示…

satellite: 利用TLE动态计算并实时显示多颗卫星的位置及轨迹

本示例的目的是介绍演示如何在vue+satellite项目中利用两行根数动态地计算,并显示多个卫星的位置及轨迹。每秒钟更新一下卫星的位置和角度,加载当前时间到固定时间(如720分钟后)的一段轨迹。 直接复制下面的 vue+openlayers源示例代码,操作2分钟即可运行实现效果 文章目…

怎么去除视频里的背景音乐?其实非常简单!

如何去除视频背景音乐&#xff1f;在视频处理中&#xff0c;有时我们需要从视频中提取声音并进行处理&#xff0c;而不仅仅是简单地去除整个背景音乐。我们可能需要有选择性地去除人声或背景音乐。这个处理过程对于选用合适的工具至关重要。在本文中&#xff0c;我将分享两种可…

深度学习图像分类、目标检测、图像分割源码小项目

​demo仓库和视频演示&#xff1a; 银色子弹zg的个人空间-银色子弹zg个人主页-哔哩哔哩视频 卷积网路CNN分类的模型一般使用包括alexnet、DenseNet、DLA、GoogleNet、Mobilenet、ResNet、ResNeXt、ShuffleNet、VGG、EfficientNet和Swin transformer等10多种模型 目标检测包括…

基于Python热门旅游景点数据分析系统设计与实现

博主介绍&#xff1a; ✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、java优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战 ✌ &#x1f345; 文末获取源码联系 &#x1f345; &#x1f447;&#x1f3fb; 精彩专…

【MySql】基本查询

文章目录 插入操作insert查询操作selectselect查询where条件判断order by排序limit筛选分页结果 更新操作update删除操作delete插入查询结果 CRUD : Create(创建), Retrieve(读取)&#xff0c;Update(更新)&#xff0c;Delete&#xff08;删除&#xff09; 先创建提供一张表&am…

【CentOS】VirtualBox yum 无法使用

【CentOS】VirtualBox yum 无法使用 yum install net-tools -y出现如下错误&#xff1a; cannot find a valid baseurl for repo: base/7/x86_64或无法ping 解决如下&#xff1a; 进入系统&#xff0c;修改网卡信息。 执行修改命令&#xff0c;ifcfg-xxx 后面的xxx就是上面…

【从删库到跑路】MySQL系列——详细讲解SQL的DDL,DML,DQL,DCL语句

&#x1f38a;专栏【MySQL】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 大一同学小吉&#xff0c;欢迎并且感谢大家指出我的问题&#x1f970; 文章目录 &#x1f354;关系型数据库⭐概念⭐特点 &#x1f354;MySQL数…