Spark入门案例

Spark shell简介

  • 启动 Spark shell 进入 Spark 安装目录后执行 spark-shell - -master master就可以提交Spark任务
  • Spark shell 的原理是把每一·行Scala代码编译成类,最终交由Spark执行

Master 地址的设置

Master的地址可以有如下几种设置方式

地址解释
local[N]使用 N 条 Worker 线程在本地运行
spark://host:port在 Spark standalone中 运行,指定 Spark 集群的Master地址,端口默认为 7077
mesos://host:port在 Apache Mesos 中运行,指定 Meso的地址
yarn在 Yarn 中运行,Yarn 的地址由环境变量 HADOOP_CONF_DIR 来指定

 

编写Spark代码的两种方式

编写Spark代码的两种方式

  • spark-shell

    Spark shell是Spark提供的一个基于Scala语言的交互式解释器,类似于Scala提供的交互式解释器,Spark shel也可以直接在Shell中编写代码执行 这种方式也比较重要,因为一般的数据分析任务可能需要探索着进行,不是一藏而就的,使用Spark shell先进行探索,当代码稳定以后,使用独立应用的方式来提交任务,这样是一个比较常见的流程

  • spark-submit

    Spark submit是一个命令,用于提交Scala编写的基于Spark框架,这种提交方式常用作于在集群中运行任务

 

Spark-Shell

Spark-Shell 读取本地文件

Step1 准备文件

在 master 中创建文件/root/data/wordcount.txt

vi /root/data/wordcount.txt
# 加入以下内容
hadoop spark flume
spark hadoop
flume hadoop

Step 2 启动Spark shell

cd /root/spark
bin/spark-shell --master local[2]

Step3 执行如下代码

scala> val sourceRdd = sc.textFile("file:///root/data/wordcount.txt")         # 读取文件
sourceRdd: org.apache.spark.rdd.RDD[String] = file:///root/data/wordcount.txt MapPartitionsRDD[1] at textFile at <console>:24scala> val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_,1))       # 拆分单词,并给与每个单词的词频为1
flattenCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26scala> val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)                   # 词频聚合    
aggCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:28scala> val result = aggCountRdd.collect
result: Array[(String, Int)] = Array((spark,2), (hadoop,3), (flume,2))

运行流程

  1. flatMap(_ .split(" ")) **将数据转为数组的形式,并展平为多个数据
  2. map( _ , 1) 将数据转换为元组的形式
  3. reduceByKey(_ + _) 将数据以 key 值相同聚合

 

Spark-Shell 读取HDFS

Step1上传文件到 HDFS 中

cd /root/data
hdfs dfs -mkdir /dataset
hdfs dfs -put wordcount.txt /dataset/

Step2 在Spark shell中访问 HDFS

val sourceRdd = sc.textFile("hdfs://master:9000/dataset/wordcount.txt") # sc.textFile("hdfs:///dataset/wordcount.txt") 或者 sc.textFile("/dataset/wordcount.txt") val flattenCountRdd = sourceRdd.flatMap(_.split(" ")).map((_,1))val aggCountRdd = flattenCountRdd.reduceByKey(_ + _)val result = aggCountRdd.collect

独立应用编写

Step 1 创建工程

  1. 创建 IDEA 工程
    1. Create New Project → Maven → Next
    2. Groupld: cn.itcast → Artifactld: spark
  2. 增加 Scala 支持
    1. 右键点击工程目录

    2. 点击增加框架支持

    3. 选择Scala添加框架支持

Step 2 编写 Maven 配置文件 pom.xml

  1. 找到工程目录下的 pom.xml文件,(无则新增)

  2. 添加以下内容

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="<http://maven.apache.org/POM/4.0.0>"xmlns:xsi="<http://www.w3.org/2001/XMLSchema-instance>"xsi:schemaLocation="<http://maven.apache.org/POM/4.0.0> <http://maven.apache.org/xsd/maven-4.0.0.xsd>"><modelVersion>4.0.0</modelVersion><groupId>cn.itcast</groupId><artifactId>spark</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.7</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.10</version><scope>provided</scope></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
    </project>
    
  3. 创建目录 src/main/scala 和目录 src/test/scala

  4. 创建Scala object WordCount

Step 3 编写代码

  • 本地运行

    package cn.itcast.spark.rddimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {// 1、创建SparkContextval conf = new SparkConf().setMaster("local[6]").setAppName("word_count")val sc = new SparkContext(conf)// 2、加载文件//    1、准备文件//    2、读取文件val rdd1 = sc.textFile("./dataset/wordcount.txt")// 3、处理//    1、把整句话拆分成多个单词val rdd2 = rdd1.flatMap(item => item.split(" "))//    2、把每个单词指定一个词频1val rdd3 = rdd2.map(item => (item,1))//    3、整合val rdd4 = rdd3.reduceByKey((x , y) => (x + y))// 4、得到结果val result = rdd4.collect()result.foreach(item=>println(item))}
    }
    
  • 提交运行

    package cn.itcast.spark.rddimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {// 1、创建SparkContextval conf = new SparkConf().setAppName("word_count")val sc = new SparkContext(conf)// 2、加载文件//    1、准备文件//    2、读取文件val rdd1 = sc.textFile("hdfs:///data/wordcount.txt")// 3、处理//    1、把整句话拆分成多个单词val rdd2 = rdd1.flatMap(item => item.split(" "))//    2、把每个单词指定一个词频1val rdd3 = rdd2.map(item => (item,1))//    3、整合val rdd4 = rdd3.reduceByKey((x , y) => (x + y))// 4、得到结果val result = rdd4.collect()result.foreach(item=>println(item))}
    }
    

Step 提交jar,虚拟机运行

cd /root/spark
bin/spark-submit --class cn.itcast.spark.rdd.WordCount --master spark://master:7077 /root/spark.jar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/325550.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【大数据进阶第三阶段之Hive学习笔记】Hive安装

目录 1、环境准备 2、下载安装 3、配置环境变量 4、配置文件 4.1、配置hive-env.sh ​编辑4.2、配置hive-site.xml 5、上传配置jar 6、启动 1、环境准备 安装hadoop 以及 zookeeper、mysql 【大数据进阶第二阶段之Hadoop学习笔记】Hadoop 运行环境搭建-CSDN博客 《z…

pygame学习(二)——绘制线条、圆、矩形等图案

导语 pygame是一个跨平台Python库(pygame news)&#xff0c;专门用来开发游戏。pygame主要为开发、设计2D电子游戏而生&#xff0c;提供图像模块&#xff08;image&#xff09;、声音模块&#xff08;mixer&#xff09;、输入/输出&#xff08;鼠标、键盘、显示屏&#xff09;模…

云原生战专题 | 深入浅出分析云原生微服务的技术结构和架构设计

深入浅出分析云原生微服务的技术结构和架构设计 云原生容器技术背景容器编排Kubernetes控制平面的四大组件Kubernetes在容器编排中的设计要点 云原生微服务典型架构第一代微服务架构第二代微服务架构第三代微服务架构第四代微服务架构 未来的云原生架构 — Serverless 云原生容…

thinkphp学习04-控制器定义

控制器&#xff0c;即 controller&#xff0c;控制器文件存放在 controller 目录下&#xff1b; 如果想改变系统默认的控制器文件目录&#xff0c;可以在 config 下 route.php 配置&#xff1a; 将controller修改为controller123&#xff0c;就会报错&#xff0c;说明这个配置…

阿里云迁移AWS视频点播技术攻坚

文章目录 &#x1f437; 背景&#x1f9a5; 简述&#x1f425; Aws服务&#x1f99c; AWS CloudFormation&#x1f41e; 问题&#x1f409; 落地方案&#x1f989; Aws vs Aliyun&#x1f344; 避坑指南 &#x1f437; 背景 由于AWS整体成本略低于阿里云&#xff0c;公司决定将…

用C语言实现完全平方数计算【一题一策】第三期

题目&#xff1a;一个整数&#xff0c;它加上100后是一个完全平方数&#xff0c;再加上 168 又是一个完全平方数&#xff0c;请问该数是多少&#xff1f; 一、题目分析 首先假设该数为x&#xff0c;则x100y?&#xff0c;y为完全平方数。 然后加上168又是一个完全平方数&…

密码学(一)

文章目录 前言一、Cryptographic Primitives二、Cryptographic Keys2.1 Symmetric key cryptography2.2 asymmetric key cryptography 三、Confidentiality3.1 Symmetric key encryption algorithms3.2 asymmetric key block ciphers3.3 其他 四、Integrity4.1 symmetric key s…

选择排序!!!基础排序详解 C语言版

目录 1.什么是选择排序 2.选择排序源代码 3.优化代码 1.什么是选择排序 这是一个选择排序的流程图&#xff0c;其实很简单&#xff0c;就是每次挑选数字中最小的作为第一个 &#xff0c;直到整个数据有序就结束了 顾名思义&#xff0c;选择&#xff0c;那就是选取&#xff0c…

数据库期末重点

第一章&#xff1a; 1.数据库发展的三个阶段 第一代数据库系统、第二代数据库系统、新一代数据库系统 2.数据库系统发展的三个里程碑 IMS系统、DBTG报告、关系数据库系统 3.数据管理技术三个阶段 人工管理阶段(40年代中-50年代中) 文件系统阶段(50年代末-60年代中) 数据…

XD6500S一款串口SiP模块 射频LoRa芯片 内置sx1262

1.1产品介绍 XD6500S是一款集射频前端和LoRa射频于一体的LoRa SIP模块系列收发器SX1262 senies&#xff0c;支持LoRa⑧和FSK调制。LoRa技术是一种扩频协议优化低数据速率&#xff0c;超长距离和超低功耗用于LPWAN应用的通信。 XD6500S设计具有4.2 mA的有效接收电流消耗&#…

C++《异常》

前言&#xff1a;C有一套独立的异常处理机制,今天就来做详细的介绍try,catch这两个词等 在C语言中处理错误的方式和缺陷有&#xff1a; 返回错误码。 缺陷&#xff1a; 1.错误码不好设置&#xff0c;比如&#xff1a;除0操作&#xff0c;就不好返回错误码。如果返回一个数字&…

面试题:说一下Java开启异步线程的几种方法?

文章目录 整体描述实现方法一、注解Async1. 添加注解2. 创建异步方法Service和实现类3. 调用异步方法 二、AsyncManager1. 创建AsyncManager类2. 创建一个耗时的操作类3. 执行异步操作 三、线程池1. 创建线程池2. 创建一个耗时的操作类3. 执行线程池 总结 整体描述 在java中异…