Spark-用IDEA编写wordcount demo

配置

Spark版本:3.2.0

Scala版本:2.12.12

JDK:1.8

Maven:3.6.3

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zzjz.Spark</groupId><artifactId>Spark</artifactId><version>1.0</version><properties><spark.version>3.2.0</spark.version><scala.version>2.12</scala.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala.version}</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.19</version><configuration><skip>true</skip></configuration></plugin></plugins></build></project>

样例数据

9422850591,11603,39939,山西,邮件,人员
9422850591,116427,39911,山西,邮件,人员
9422850591,116437,39895,山西,邮件,人员

代码

import org.apache.spark.{SparkConf, SparkContext}  // 导入SparkConf和SparkContext类
object wcPerson {def main (args:Array[String]): Unit ={// 创建SparkConf对象,设置应用程序名称为"wcPerson",运行模式为本地模式,使用一个CPU核心val conf = new SparkConf().setAppName("wcPerson").setMaster("local[1]")  // 创建SparkContext对象,与Spark集群进行通信val sc = new SparkContext(conf) // 加载文件,将每一行作为一个字符串元素,返回一个RDD val inputFile = sc.textFile("D:\\workspace\\spark\\src\\main\\Data\\person")  // 对RDD应用flatMap转换操作,将每一行按","分割成多个单词,并将所有单词扁平化为一个RDDval wc = inputFile.flatMap(line => line.split(","))  // 对RDD应用map转换操作,将每个单词映射为(key, value)的元组,其中key为单词本身,value为1.map(word => (word,1)) // 对相同key的元组进行聚合操作,将相同key的value相加 .reduceByKey((a,b) => a + b) // 打印输出聚合结果 wc.foreach(println)  }
}

运行结果

D:\Java\jdk1.8.0_131\bin\java.exe "-javaagent:D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar=52283:D:\idea\IntelliJ IDEA 2021.1.3\bin" -Dfile.encoding=UTF-8 -classpath "D:\idea\IntelliJ IDEA 2021.1.3\lib\idea_rt.jar" com.intellij.rt.execution.CommandLineWrapper C:\Users\Administrator\AppData\Local\Temp\idea_classpath1156784809 wcPerson
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/spark/spark-3.2.0-bin-hadoop2.7/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/Maven/Maven_repositories/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
23/07/11 10:00:01 INFO SparkContext: Running Spark version 3.2.0
23/07/11 10:00:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/11 10:00:02 INFO ResourceUtils: ==============================================================
23/07/11 10:00:02 INFO ResourceUtils: No custom resources configured for spark.driver.
23/07/11 10:00:02 INFO ResourceUtils: ==============================================================
23/07/11 10:00:02 INFO SparkContext: Submitted application: wcPerson
23/07/11 10:00:02 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/07/11 10:00:02 INFO ResourceProfile: Limiting resource is cpu
23/07/11 10:00:02 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/07/11 10:00:02 INFO SecurityManager: Changing view acls to: Administrator
23/07/11 10:00:02 INFO SecurityManager: Changing modify acls to: Administrator
23/07/11 10:00:02 INFO SecurityManager: Changing view acls groups to: 
23/07/11 10:00:02 INFO SecurityManager: Changing modify acls groups to: 
23/07/11 10:00:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(Administrator); groups with view permissions: Set(); users  with modify permissions: Set(Administrator); groups with modify permissions: Set()
23/07/11 10:00:07 INFO Utils: Successfully started service 'sparkDriver' on port 52323.
23/07/11 10:00:07 INFO SparkEnv: Registering MapOutputTracker
23/07/11 10:00:07 INFO SparkEnv: Registering BlockManagerMaster
23/07/11 10:00:07 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/07/11 10:00:07 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/07/11 10:00:07 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/07/11 10:00:07 INFO DiskBlockManager: Created local directory at C:\Users\Administrator\AppData\Local\Temp\blockmgr-0052575b-7c9f-457e-9ed5-fb50af59f965
23/07/11 10:00:07 INFO MemoryStore: MemoryStore started with capacity 623.4 MiB
23/07/11 10:00:07 INFO SparkEnv: Registering OutputCommitCoordinator
23/07/11 10:00:07 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/07/11 10:00:08 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://zzjz:4040
23/07/11 10:00:08 INFO Executor: Starting executor ID driver on host zzjz
23/07/11 10:00:08 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52339.
23/07/11 10:00:08 INFO NettyBlockTransferService: Server created on zzjz:52339
23/07/11 10:00:08 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/07/11 10:00:08 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, zzjz, 52339, None)
23/07/11 10:00:08 INFO BlockManagerMasterEndpoint: Registering block manager zzjz:52339 with 623.4 MiB RAM, BlockManagerId(driver, zzjz, 52339, None)
23/07/11 10:00:08 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, zzjz, 52339, None)
23/07/11 10:00:08 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, zzjz, 52339, None)
23/07/11 10:00:10 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 244.0 KiB, free 623.2 MiB)
23/07/11 10:00:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 23.4 KiB, free 623.1 MiB)
23/07/11 10:00:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on zzjz:52339 (size: 23.4 KiB, free: 623.4 MiB)
23/07/11 10:00:10 INFO SparkContext: Created broadcast 0 from textFile at wcPerson.scala:7
23/07/11 10:00:10 INFO FileInputFormat: Total input paths to process : 1
23/07/11 10:00:10 INFO SparkContext: Starting job: foreach at wcPerson.scala:10
23/07/11 10:00:11 INFO DAGScheduler: Registering RDD 3 (map at wcPerson.scala:9) as input to shuffle 0
23/07/11 10:00:11 INFO DAGScheduler: Got job 0 (foreach at wcPerson.scala:10) with 1 output partitions
23/07/11 10:00:11 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at wcPerson.scala:10)
23/07/11 10:00:11 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
23/07/11 10:00:11 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
23/07/11 10:00:11 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wcPerson.scala:9), which has no missing parents
23/07/11 10:00:11 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 6.9 KiB, free 623.1 MiB)
23/07/11 10:00:11 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.0 KiB, free 623.1 MiB)
23/07/11 10:00:11 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on zzjz:52339 (size: 4.0 KiB, free: 623.4 MiB)
23/07/11 10:00:11 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1427
23/07/11 10:00:11 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at wcPerson.scala:9) (first 15 tasks are for partitions Vector(0))
23/07/11 10:00:11 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks resource profile 0
23/07/11 10:00:11 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (zzjz, executor driver, partition 0, PROCESS_LOCAL, 4503 bytes) taskResourceAssignments Map()
23/07/11 10:00:11 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
23/07/11 10:00:12 INFO HadoopRDD: Input split: file:/D:/workspace/spark/src/main/Data/person:0+135
23/07/11 10:00:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1325 bytes result sent to driver
23/07/11 10:00:12 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1022 ms on zzjz (executor driver) (1/1)
23/07/11 10:00:12 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
23/07/11 10:00:12 INFO DAGScheduler: ShuffleMapStage 0 (map at wcPerson.scala:9) finished in 1.263 s
23/07/11 10:00:12 INFO DAGScheduler: looking for newly runnable stages
23/07/11 10:00:12 INFO DAGScheduler: running: Set()
23/07/11 10:00:12 INFO DAGScheduler: waiting: Set(ResultStage 1)
23/07/11 10:00:12 INFO DAGScheduler: failed: Set()
23/07/11 10:00:12 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at wcPerson.scala:9), which has no missing parents
23/07/11 10:00:12 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.3 KiB, free 623.1 MiB)
23/07/11 10:00:12 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.1 KiB, free 623.1 MiB)
23/07/11 10:00:12 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on zzjz:52339 (size: 3.1 KiB, free: 623.4 MiB)
23/07/11 10:00:12 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1427
23/07/11 10:00:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at wcPerson.scala:9) (first 15 tasks are for partitions Vector(0))
23/07/11 10:00:12 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks resource profile 0
23/07/11 10:00:12 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1) (zzjz, executor driver, partition 0, NODE_LOCAL, 4271 bytes) taskResourceAssignments Map()
23/07/11 10:00:12 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
23/07/11 10:00:12 INFO ShuffleBlockFetcherIterator: Getting 1 (142.0 B) non-empty blocks including 1 (142.0 B) local and 0 (0.0 B) host-local and 0 (0.0 B) push-merged-local and 0 (0.0 B) remote blocks
23/07/11 10:00:12 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 20 ms
(116437,1)
(39911,1)
(116427,1)
(9422850591,3)
(39895,1)
(山西,3)
(11603,1)
(39939,1)
(人员,3)
(邮件,3)
23/07/11 10:00:12 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1224 bytes result sent to driver
23/07/11 10:00:12 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 123 ms on zzjz (executor driver) (1/1)
23/07/11 10:00:12 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
23/07/11 10:00:12 INFO DAGScheduler: ResultStage 1 (foreach at wcPerson.scala:10) finished in 0.153 s
23/07/11 10:00:12 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
23/07/11 10:00:12 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
23/07/11 10:00:12 INFO DAGScheduler: Job 0 finished: foreach at wcPerson.scala:10, took 2.044775 s
23/07/11 10:00:12 INFO SparkContext: Invoking stop() from shutdown hook
23/07/11 10:00:12 INFO SparkUI: Stopped Spark web UI at http://zzjz:4040
23/07/11 10:00:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/11 10:00:12 INFO MemoryStore: MemoryStore cleared
23/07/11 10:00:12 INFO BlockManager: BlockManager stopped
23/07/11 10:00:12 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/11 10:00:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/11 10:00:12 INFO SparkContext: Successfully stopped SparkContext
23/07/11 10:00:12 INFO ShutdownHookManager: Shutdown hook called
23/07/11 10:00:12 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-9f3eb32d-30f7-44d6-8751-f668b2710d89Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/20585.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Coggle 30 Days of ML (23年7月)任务二:数据可视化

Coggle 30 Days of ML (23年7月&#xff09;任务二&#xff1a;数据可视化 任务二&#xff1a;对数据集字符进行可视化&#xff0c;统计标签和字符分布 说明&#xff1a;在这个任务中&#xff0c;需要使用Pandas库对数据集的字符进行可视化&#xff0c;并统计数据集中的标签和…

[PyTorch][chapter 44][时间序列表示方法4]

前言&#xff1a; 训练复杂度 OE*T*Q 参数 全称 E 迭代次数 Number of the training epochs T数据集大小 Number of the words in the training set Q 模型计算复杂度 Model computational complexity E,T 一般都认为相同&#xff0c;所以这里面主要讨论Q&#xff0c;模…

驱动day6

驱动程序 #include <linux/init.h> #include <linux/module.h> #include <linux/of.h> #include <linux/of_irq.h> #include <linux/of_gpio.h> #include <linux/platform_device.h> #include <linux/mod_devicetable.h> #include …

LayUI 实现二级导航栏

目录 实现步骤&#xff1a; 1. 分析数据库 2. 构建数据源 2.1 编写实体类 2.2 编写节点实体类 2.3 构建BuildTree节点结构方法类 2.4 编写dao类 2.5 编写数据Acntion控制类 3. 前台准备 3.1 配置mvc.xml文件 3.2 页面编写 3.3 运行效果 实现步骤&#xff1a; 1. 分…

Linux系统使用(超详细)

目录 Linux操作系统简介 Linux和windows区别 Linux常见命令 Linux目录结构 Linux命令提示符 常用命令 ls cd pwd touch cat echo mkdir rm cp mv vim vim的基本使用 grep netstat Linux面试题 Linux操作系统简介 Linux操作系统是和windows操作系统是并列…

在线试用Stable Diffusion生成可爱的图片

文章目录 一、 Stable Diffusion 模型在线使用地址&#xff1a;二、模型相关版本和参数配置&#xff1a;三、图片生成提示词与反向提示词&#xff1a;提示词1提示词2提示词3提示词4提示词5 一、 Stable Diffusion 模型在线使用地址&#xff1a; https://inscode.csdn.net/insc…

centos7 环境下部署 nacos单机模式

1、官网下载 nacos 官网地址&#xff1a;home 去github上下载nacos-server。我下载的是 nacos-server-1.4.1.tar.gz 2、安装 nacos 下载完成后&#xff0c;将安装包上传到 centos 创建 nacos 目录&#xff08;安装位置任意&#xff09; mkdir -p /usr/local/nacos解压 nac…

实战打靶集锦-021-glasgowsmile

提示&#xff1a;本文记录了博主的一次曲折的打靶经历。 目录 1. 主机发现2. 端口扫描3. 服务枚举4. 服务探查4.1 手工访问4.2 目录枚举4.3 手工探查4.4 搜索EXP4.5 joomlascan4.6 用户猜测与密码爆破4.7 构建反弹shell 5. 提权5.1 优化shell5.2 枚举系统信息5.3 探查/etc/pass…

第24章:事务基础知识

一、数据库事务Transactions 1.为什么要使用事务 事务可以让数据库保持一致性&#xff0c;通过事务的机制恢复到某个时间点&#xff0c;即使系统崩溃数据库修改的数据不会丢失。 2.存储引擎支持事务的情况 命令: show engines; 只有InnoDB支持事务 3.事务基本概念 事务&a…

【Distributed】分布式ELK日志文件分析系统(一)

文章目录 一、ELK 概述1. 为什么要使用 ELK2. 完整日志系统基本特征3. ELK 简介3.1 ElasticSearch&#xff08;ES&#xff09;3.2 Kiabana3.3 Logstash3.4 其它组件Filebeat缓存/消息队列Fluentd 4. ELK 的工作原理5. Linux 系统内核日志消息的优先级别 二、 部署 ELK 集群服务…

JAVA开发(JAVA视频监控接口相关)

一、背景 最近在做视频监控接口相关的开发&#xff0c;需要调用视频的接口获取直播地址&#xff0c;回放地址&#xff0c;然后集成到web里查看。 二、涉及的接口 1、获取卡口的id 2、通过卡口id获取通道&#xff08;设备的id&#xff09; 3、通过设备的id获取到直播地址 4…

密码学入门——HMAC

文章目录 一、什么是HMAC二、HMAC的步骤 一、什么是HMAC HMAC是一种使用单向散列函数来构造消息认证码的方法(RFC2104)&#xff0c;其中 HMAC的H就是Hash的意思。 HMAC 中所使用的单向散列函数并不仅限于一种&#xff0c;任何高强度的单向散列函数都可以被用于HMAC&#xff0…