hadoop --- MapReduce

MapReduce定义:

MapReduce可以分解为Map (映射) + Reduce (规约) , 具体过程: 

  1.  Map : 输入数据集被切分成多个小块,并分配给不同的计算节点进行处理
  2. Shuffle and Sort:洗牌和排序,在 Map 阶段结束后,将每个 Mapper 生成的键值对按照键进行排序,并将相同键的值归并在一起,并将相同的键发送给后续的reduce
  3. Reduce: 规约计算,每个计算节点独立处理它们的键值对,并生成最终的输出结果。

 MapReduce是一个分布式运算程序的编程框架,用于用户开发“基于Hadoop的数据分析应用”的核心框架。f核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

优点:

  1. 易于编程:用户只关心,业务逻辑。实现框架的接口。
  2. 良好扩展性:可以动态增加服务器,解决计算资源不够问题
  3. 高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
  4. 并行处理:能够有效地利用集群中多个计算节点进行并行计算,提高处理速度。
  5. 适合海量数据计算(TB、PB)几千台服务器共同计算

缺点:

  1. 不擅长实时计算。Mysql
  2. 不擅长流式计算。Sparkstreaming flink
  3. 不擅长DAG有向无环图计算。Spark 

MapReduce架构: 

        MapReduce中,执行MapReduce任务的机器角色有两种: JobTracker 和 TaskTracker, 其中JobTracker 用于任务调度, TaskTracker用于执行任务。 一个Hadoop集群中, 只有一台JobTracker。

 当Client向JobTracker提交作业时, JobTracker会讲作业拆分到多个TaskTracker去执行, TaskTracker会定时发送心跳信息,如果一段时间JobTracker未收到TaskTracker的心跳信息,则认定该TaskTracker出现故障, 会讲该TaskTracker的任务分配给其他TackTracker。

MapReduce执行过程: 

  1.  客户端启动一个job
  2. 客户端向JobTracker请求一个JobID
  3. JobClient讲运行作业所需要的资源复制到HDFS上, 包括jar文件、配置文件、客户端计算所得的输入划分信息,并存档在以JobID 为名的文件夹中。
  4. JobClient 提交任务给JobTracker.
  5. JobTracker 调度作业,并根据输入划分信息为每一个划分创建一个map任务,并将map任务分配给taskTracker执行。【图中的5/6步骤】
  6. TaskTracker每隔一段时间给JobTracker发送一个Heartbeat告诉JobTracker它仍然在运行,同时心跳还携带很多比如map任务完成的进度等信息。当JobTracker收到作业的最后一个任务完成信息时,便把作业设置成“成功”,JobClient再传达信息给用户。

MapReduce wordCount的案例: 

hadoop数据类型
Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritabl

统计文档中单词出现的频次

1、引入pom依赖: 

    <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency> 

2、序列化类 Writable

【当前案例中并未使用到自定义的序列化接口】

Hadoop有自己的序列化机制--Writable, 相比于Java的序列化,hadoop的序列化更紧凑、快速、支持多语言。

Hadoop的序列化步骤: 

  1. 实现Writable接口
  2. 反序列化时需要调用无参构造,所以序列化对象必须要有无参构造
  3. 重写序列化方法write() 
  4. 重写反序列化方法readFidlds()
  5. 反序列化的顺序和序列化的顺序必须完全一致 
  6. 重写toString() ,将结果显示在文件中 
  7. 实现Comparable接口,将自定义的序列化对象放在key中传输
//1 实现Writable接口
@Data
public class FlowBeanWritable implements Writable, Comparable<FlowBeanWritable> {private long upFlow; private long downFlow; private long sumFlow; //2 提供无参构造public FlowBeanWritable() { }//4 实现序列化和反序列化方法,注意顺序一定要保持一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//5 重写ToString@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}// 6 如果作为Key传输,则还需要实现compareTo方法@Overridepublic int compareTo(FlowBeanWritable o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;}
}

3、编写Mapper 类,实现Mapper接口

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text outK = new Text();private IntWritable outV = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {// 1 获取一行并将其转成String类型来处理String line = value.toString();// 2 将String类型按照空格切割后存进String数组String[] words = line.split(" ");// 3 依次取出单词,将每个单词和次数包装成键值对,写入context上下文中供后续调用for (String word : words) {// 先将String类型,转为text,再包装成健值对outK.set(word);context.write(outK, outV);}}
}

Mapper<LongWritable, Text, Text, IntWritable> 泛型里面有四个类, 这里其实是两对键值对:

  1. LongWritable 、Text :表示输入数据,LongWritable表示数据的索引,类似于第几行数据; Text表示读取的文件内容。一般使用系统默认的键值对。
  2. Text、IntWritable: 表示输出数据, Text表示输入的单词, IntWritable表示该单词出现的次数。这个键值对需要根据业务需求来确定。

4、编写Reducer类,继承Reduce抽象类

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}outV.set(sum);//写出context.write(key,outV);}
}

Reducer<Text, IntWritable, Text, IntWritable>  泛型里面有四个类, 这里也是两对键值对:

  • Text, IntWritable:第一个键值对,要跟Mapper的输出泛型保持一致
  • Text, IntWritable:第二个键值对,表示输出的结果数据,因为这里要输出的是单词出现的次数,所以还是 Text、IntWritable 类型 

Reduce是每组会执行一次,就是相同的key是会分到同一组的,所以此处只需计算每个key的count叠加即可

5、编写Driver驱动类 


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 获取配置信息以及job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 关联当前Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 指定Mapper和Reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 设置输入、输出的k、v类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 将job提交给yarn运行Boolean result = job.waitForCompletion(Boolean.TRUE);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/14887.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

15.1 BP神经网络实现图像压缩——了解神经网络在图像处理方面的应用(matlab程序)

1.简述 BP神经网络现在来说是一种比较成熟的网络模型了,因为神经网络对于数字图像处理的先天优势,特别是在图像压缩方面更具有先天的优势,因此,我这一段时间在研究神经网络的时候同时研究了一下关于BP网络实现图像压缩的原理和过程,并且是在MATLAB上进行了仿真的实验,结果发现设…

TinyStories: How Small Can Language Models Be and Still Speak Coherent English?

本文是LLM系列的文章之一&#xff0c;针对《TinyStories: How Small Can Language Models Be and Still Speak Coherent English?》的翻译。 TinyStories&#xff1a;语言模型能有多小&#xff0c;还能说连贯的英语&#xff1f; 摘要1 引言2 TinyStories数据集的描述2.1 Tiny…

3D模型轻量化开发工具HOOPS与WebGL的对比分析

HOOPS是一种商业级的3D开发平台&#xff0c;由Tech Soft 3D公司开发。它提供了一套全面的工具和API&#xff0c;用于构建和展示高度复杂的3D场景和模型&#xff0c;可以在多个平台和环境中使用&#xff0c;包括Web、移动设备和桌面&#xff0c;这使得开发者能够在不同的设备上展…

UE5接入在线直播视频源,如hls(m3u8)格式

文章目录 1.实现目标2.实现过程2.1 VlcMedia插件重编译2.2 UE5接入在线直播2.3 创建材质3.参考资料1.实现目标 通过重编译VlcMedia插件,以支持在线直播视频在UE5中的播放,GIF动图如下: 2.实现过程 本文主要包括插件的重编译、在线直播视频的接入,以及材质的创建三个部分。…

ELK部署安装

目录 一、环境准备 1.准备三台服务器&#xff08;带图形化的linuxCentOS7&#xff0c;最小化缺少很多环境&#xff09; 2.修改主机名 3.关闭防火墙 4.elk-node1、elk-node2 用系统自带的java 5.上传软件包到node1和node2 二、部署elasticsearch 1、node1、node2操作 2.no…

Coggle 30 Days of ML(23年7月)任务四:线性模型训练与预测

Coggle 30 Days of ML&#xff08;23年7月&#xff09;任务四&#xff1a;线性模型训练与预测 任务四&#xff1a;使用TFIDF特征和线性模型完成训练和预测 说明&#xff1a;在这个任务中&#xff0c;你需要使用TFIDF特征和线性模型&#xff08;如逻辑回归&#xff09;完成训练…

Jmeter做单接口测试-超详细步骤讲解

测试项目&#xff1a;本章节将以此测试项目为大家讲解怎么使用jmeter做一个接口测试 CSDN - 专业开发者社区CSDN是全球知名中文IT技术交流平台,创建于1999年,包含原创博客、精品问答、职业培训、技术论坛、资源下载等产品服务,提供原创、优质、完整内容的专业IT技术开发社区.h…

基于梯度下降算法的无约束函数极值问题求解

基于梯度下降算法的无约束函数极值问题求解 1 知识预警1.1导数1.2偏导数1.3方向导数1.4梯度 2 梯度下降算法3 无约束函数极值问题求解3.1 算例13.1.1 Python编程求解3.1.2 求解结果与可视化 3.2 算例2 Rosenbrock函数3.2.1 Python编程求解3.2.2 求解结果与可视化 1 知识预警 1…

多元分类预测 | Matlab 麻雀算法(SSA)优化xgboost的分类预测,多特征输入模型,SSA-xgboost分类预测模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元分类预测 | Matlab 麻雀算法(SSA)优化xgboost的分类预测,多特征输入模型,SSA-xgboost分类预测模型 多特征输入单输出的二分类及多分类模型。程序

自动化漏洞挖掘方式

自动化漏洞挖掘方式 一、Goby安装使用1.1、goby简介1.2、goby下载安装1.3、简单扫描1.4、Goby插件 二、Xray安装使用2.1、XRAY简介2.2、Xray安装2.3、Xray使用2.4、爬虫模式&#xff08;主动扫描&#xff09;2.5、被动扫描2.6、BurpSuite联动Xray2.7、Rad联动Xray 一、Goby安装…

短视频抖音账号矩阵系统源码开发者自研(二)

一、短视频账号矩阵系统源码开发储存集群化开发运转更快 短视频账号矩阵系统源码开发采用储存集群化开发&#xff0c;可以显著提高系统的运转速度和效率。通过优化储存结构和算法&#xff0c;系统能够更快地处理短视频账号数据&#xff0c;确保用户能够流畅地浏览和观看短视频…

Docker中部署Redis集群与部署微服务项目的详细过程

目录 一、使用Docker部署的好处二、Docker 与 Kubernetes 对比三、Redis集群部署实战四、Spring Boot项目 打包镜像?小结 一、使用Docker部署的好处 Docker的好处在于&#xff1a;在不同实例上运行相同的容器 Docker的五大优点&#xff1a; 持续部署与测试、多云服务平台支…