Spark—通过Java、Scala API实现WordCount案例的基本操作

实验原理

Spark的核心就是RDD,所有在RDD上的操作会被运行在Cluster上,Driver程序启动很多Workers,Workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),然后对RDD在内存中进行缓存和计算。
在这里插入图片描述
而RDD有两种类型的操作 ,分别是Action(返回values)和Transformations(返回一个新的RDD)。
在这里插入图片描述

一、数据展示与前置准备

某电商网站记录了大量用户对商品的收藏数据,并将数据存储在名为buyer_favorite1的文件中,数据格式以及数据内容如下
在这里插入图片描述
在进行后续操作前,请先开启hadoop和spark服务。可以通过jps命令查看进程是否开启完整。

二、创建scala工程项目

1、开发环境:eclipse

打开已安装完Scala插件的Eclipse,新建一个Scala项目,命名为spark4。
在spark4项目下新建包名,命名为my.scala。将scala object命名为ScalaWordCount。
在这里插入图片描述

2、导入运行所需要的jar包。

右键项目,创建一个文件夹,名为lib。
将jar包导入进来,再右键jar包,点击Build Path=>Add to Build Path。(可以去我的资源里面下载spark1.x hadoop2.x)

3、编写Scala语句,并统计用户收藏数据中,每个用户收藏商品数量。
package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
object ScalaWordCount {  def main(args: Array[String]) {  //创建Spark的配置对象sparkConf,设置Spark程序运行时的配置信息; val conf = new SparkConf()  conf.setMaster("local")  .setAppName("scalawordcount")//创建SparkContext对象,SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java还是Python都必须有一个SparkContext;  val sc = new SparkContext(conf)   val rdd = sc.textFile("hdfs://localhost:9000/myspark/buyer_favorite1")   //根据具体的数据来源,通过SparkContext来创建RDD;//对初始的RDD进行Transformation级别的处理。(首先将每一行的字符串拆分成单个的单词,然后在单词拆分的基础上对每个单词实例计数为1;//最后,在每个单词实例计数为1的基础上统计每个单词在文件出现的总次数)。rdd.map(line => (line.split("\t")(0), 1))  .reduceByKey(_ + _)  .collect()  .foreach(println)  sc.stop()  }  
}  

在控制界面console中查看的输出结果。在这里插入图片描述

三、创建Java工程项目

再次右键点击项目名,新建package,将包命名为my.java 。
右键点击包my.java,新建Class,命名为JavaWordCount。

1、编写Java代码,统计用户收藏数据中,每个用户收藏商品数量。
package my.java;  
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  
import scala.Tuple2;  
import java.util.Arrays;  
import java.util.List;  
import java.util.regex.Pattern;  
public final class JavaWordCount {  private static final Pattern SPACE = Pattern.compile("\t");  public static void main(String[] args) throws Exception {  SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("JavaWordCount");  JavaSparkContext ctx = new JavaSparkContext(sparkConf);  JavaRDD<String> lines = ctx.textFile("hdfs://localhost:9000/myspark/buyer_favorite1");  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {  @Override  public Iterable<String> call(String s) {  String word[]=s.split("\t",2);  return Arrays.asList(word[0]);  }  });  JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {  @Override  public Tuple2<String, Integer> call(String s) {  return new Tuple2<String, Integer>(s, 1);  }  });  JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {  @Override  public Integer call(Integer i1, Integer i2) {  return i1 + i2;  }  });  List<Tuple2<String, Integer>> output = counts.collect();  System.out.println(counts.collect());  counts.saveAsTextFile("hdfs://localhost:9000/myspark/out");  ctx.stop();  }  
}
2、在linux终端查看输出结果

执行如下命令查看结果,前提是已启动集群

hadoop fs -cat /myspark/out/part-00000  
写在最后

由此可以看出,scala语言在编写spark程序时的优越性,简短精炼。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/15063.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FreeRTOS ~(七)互斥量 ~ (1/3)互斥量解决互斥缺陷

前情提要 FreeRTOS ~&#xff08;四&#xff09;同步互斥与通信 ~ &#xff08;2/3&#xff09;互斥的缺陷 FreeRTOS ~&#xff08;五&#xff09;队列的常规使用 ~ &#xff08;2/5&#xff09;队列解决互斥缺陷 FreeRTOS ~&#xff08;六&#xff09;信号量 ~ &#xff08;2/…

文字磨练课程:提高编辑和校对效率的方法

提高编辑和校对效率&#xff0c;可以使你更有效地完成写作任务&#xff0c;提升文章质量。以下是一些方法&#xff0c;可以帮助你在编辑和校对过程中提高效率。 1.设定目标和计划 在开始编辑和校对前&#xff0c;设定明确的目标和计划。这可以帮助你集中注意力&#xff0c;提…

瑞芯微:基于RKNN3568得ufldv2部署

Lane检测是自动驾驶和高级驾驶员辅助系统&#xff08;ADAS&#xff09;的基本组成部分&#xff0c;用于区分和定位道路上的车道线。尽管深度学习模型已经取得了巨大成功&#xff0c;但仍有一些重要且具有挑战性的问题需要解决。 第一个问题是效率问题。在实践中&#xff0c;车…

从JDK源码级别剖析JVM类加载机制

1 什么是Java虚拟机 一个可执行java字节码的虚拟机进程&#xff1b;跨平台的是java程序&#xff0c;而不是java虚拟机&#xff0c;java虚拟机在各个操作系统是不兼容的&#xff0c;例如windows、linux、mac都需要安装各自版本的虚拟机&#xff0c;java虚拟机通过jdk实现功能。…

docker-compose实现微服务jar+mysql的容器服务发布(经典版)

一 安装mysql服务 1.1 拉取镜像 1.拉取&#xff1a; docker pull mysql:5.7.29 2.查看镜像&#xff1a; docker images 1.2 在宿主机创建文件存储mysql 1.创建映射目录&#xff1a;mysql-c5 在/root/export/dockertest 目录下&#xff0c;mkdir -p mysql-c5 &#…

Elasticsearch介绍和安装

ELK简介 Elastic Stack核心产品包括Elasticsearch、Logstash、Kibana&#xff08;也称为ELK&#xff09;和Beats等等。能够安全可靠地获取任何来源、任何格式的数据&#xff0c;然后实时地对数据进行搜索、分析和可视化 Kibana是一个免费且开放的用户界面&#xff0c;能…

LVS +Keepalived高可用群集

文章目录 一、Keepalived概述二、Keepalived服务重要功能1.管理 LVS 负载均衡软件2.支持故障自动切换&#xff08;Failover&#xff09;3.实现 LVS 集群中节点的健康检查&#xff08;Health Checking&#xff09;4.VRRP通信原理 三、keepalived体系主要模块及作用四、keepalive…

LogicFlow 在HTML中的引入与使用

LogicFlow 在HTML中的引入与使用 LogicFlow的引入与使用&#xff0c;相较于BPMNJS相对容易一些&#xff0c;更加灵活一些&#xff0c;但是扩展代码可能写得更多一些。 示例展示 示例代码 github: https://github.com/iotzzh/origin-examples/blob/main/%E6%B5%81%E7%A8%8B%E5%9…

ADB 命令结合 monkey 的简单使用,超详细

一&#xff1a;ADB简介 1&#xff0c;什么是adb&#xff1a; ADB 全称为 Android Debug Bridge&#xff0c;起到调试桥的作用&#xff0c;是一个客户端-服务器端程序。其中客户端是用来操作的电脑&#xff0c;服务端是 Android 设备。ADB 也是 Android SDK 中的一个工具&#…

C++中的继承/虚继承原理

C中的继承 文章目录 C中的继承1.继承的概念和定义1.1 继承定义1.12 继承关系和访问限定符2.基类和派生类对象的复制转换3.继承中的作用域4.派生类的默认成员函数继承与友元 6.**继承与静态成员****复杂的菱形继承及菱形虚拟继承**7.虚继承解决数据冗余和二义性的原理 1.继承的概…

亚马逊云科技Zero ETL数据库,助力企业走向数据驱动的业务增长之路

据Forrester研究&#xff0c;相对于数据应用不够成熟的公司&#xff0c;那些有效获取业务洞察的公司&#xff0c;有高达8.5倍的可能性实现至少20%的收入增长。然而&#xff0c;要实现这一增长&#xff0c;需要简化一项流程——在数据分析前管理和准备好数据。这就是为什么亚马逊…

Java动态规划LeetCode1137. 第 N 个泰波那契数

方法1&#xff1a;通过动态规划解题&#xff0c;这道题也是动态规划的一道很好的入门题&#xff0c;因为比较简单和容易理解。 代码如下&#xff1a; public int tribonacci(int n) {//处理特殊情况if(n0){return 0;}if(n1||n2){return 1;}//定义数组int[]dpnew int[n1];//初…