【Hadoop】MapReduce详解

🦄 个人主页——🎐开着拖拉机回家_大数据运维-CSDN博客 🎐✨🍁

🪁🍁🪁🍁🪁🍁🪁🍁 🪁🍁🪁🍁🪁🍁🪁 🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁🪁🍁

感谢点赞和关注 ,每天进步一点点!加油!

目录

一、MapReduce概述

1. 1 MapReduce 介绍

1.2 MapReduce 定义

1.3 MapReduce优缺点

1.2.1.优点

1.2.2.缺点

1.4 MapReduce框架结构

二、WordCount 案例

三、MapReduce的运行机制详解

3.1 MapTask 工作机制

3.2 ReduceTask 工作机制

3.3 Shuffle 过程


一、MapReduce概述


1. 1 MapReduce 介绍


MapReduce思想在生活中处处可见。MapReduce 的思想核心是“分而治之”,适用于大规模数据处理场景。

  • Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • Reduce负责“合”,即对map阶段的结果进行全局汇总。
  • MapReduce运行在YARN集群。
     

这两个阶段合起来正是MapReduce思想的体现。

1.2 MapReduce 定义


MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

(1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。

(2)MapReduce是一个并行计算与运行软件框架(Software Framework)它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。

(3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 [百度百科] 。

1.3 MapReduce优缺点


1.2.1.优点

(1)MapReduce易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

(2)良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

(3)高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

(4)适合PB级以上海量数据的离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。

1.2.2.缺点

(1)不擅长实时计算

MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果,更多的适合离线或者T+1的任务。

(2)不擅长流式计算

流式计算的输入数据是动态的, 如Flink或者Spark Streaming,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

(3)不擅长DAG(有向无环图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

1.4 MapReduce框架结构


一个完整的mapreduce程序在分布式运行时有三类实例进程:

  • MR AppMaster负责管理MR作业的生命周期及状态协调, 一般指的是Yarn中AppMaster,针对MapReduce计算框架就是MR AppMaster,它使得MapReduce计算框架可以运行与YARN之上;
  • MapTask负责map阶段的整个数据处理流程;
  • ReduceTask负责reduce阶段的整个数据处理流程。


二、WordCount 案例


数据格式准备如下:

vim wordcount.txt     
hello I am ok   
hadoop hadoop
hello world
hello flume
hadoop hive
hive kafka
flume storm
hive oozie
hadoop hbase
hadoop flink
hive azkaban

将数据上传到HDFS

hdfs dfs -mkdir -p  /kangll/workcount
hdfs dfs   -put wordcount.txt   /kangll/workcount

代码示例

package com.kangna.mapreducer;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/********************************* @Author: kangna* @Date: 2020/1/25 11:14* @Version: 1.0* @Desc:********************************/
public class WordCountMain {public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {private Text word = new Text();private IntWritable one = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 取到一行个数据String line = value.toString();// 按照空格切分String[] words = line.split(" ");// 遍历数据for (String word : words) {this.word.set(word);context.write(this.word, this.one);}}}public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable > {private IntWritable total = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 作累加int sum = 0;for (IntWritable value : values) {sum += value.get();}// 包装 结构并输出total.set(sum);context.write(key, total);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1. 获取一个 Job 实例Job job = Job.getInstance(new Configuration());// 2. 设置  类的路径job.setJarByClass(WordCountMain.class);// 3. 设置 Mapper 和 Reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4. 设置 Mapper 和  Reducer 的输出类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 5. 设置输入输出数据FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 6. 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

打包在集群中运行。


三、MapReduce的运行机制详解



3.1 MapTask 工作机制

Map阶段流程大体如上图

简单概述:InputFile 通过 split 被逻辑切分为多个split文件,通过Record按行读取内容给 map(用户自己实现的)进行处理,数据被 map 处理结束之后交给 OutputCollector 收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

详细步骤:

  1. Split阶段读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 方法对输入目录中文件进行逻辑切片规划得到 block, 有多少个 block就对应启动多少个 MapTask
  2. Read阶段:将输入文件切分为 block 之后, 由 RecordReader 对象 (默认是LineRecordReader) 进行读取, 以 \n 作为分隔符, 读取一行数据, 返回 <key,value>. Key 表示每行首字符偏移值, Value 表示这一行文本内容
  3. Map阶段读取 block 返回 <k ey,value>, 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数, RecordReader 读取一行这里调用一次
  4. Collection收集阶段 Mapper 逻辑结束之后, 将 Mapper 的每条结果通过 context.write 进行collect数据收集. 在 collect 中, 会先对其进行分区处理,默认使用 HashPartitioner

MapReduce 提供 Partitioner 接口, 它的作用就是根据 Key 或 Value 及 Reducer 的数量来决定当前的这对输出数据最终应该交由哪个 Reduce task 处理, 默认对 Key Hash 后再以 Reducer 数量取模. 默认的取模方式只是为了平均 Reducer 的处理能力, 如果用户自己对 Partitioner 有需求, 可以订制并设置到 Job 上

       5. 接下来, 会将数据写入内存, 内存中这片区域叫做环形缓冲区, 缓冲区的作用是批量收集 Mapper 结果, 减少磁盘 IO 的影响. 我们的 <Key,Value> 对以及 Partition 的结果都会被写入缓冲区. 当然, 写入之前,Key 与 Value 值都会被序列化成字节数组

环形缓冲区其实是一个数组, 数组中存放着 Key, Value 的序列化数据和 Key, Value 的元数据信息, 包括 Partition, Key 的起始位置, Value 的起始位置以及 Value 的长度. 环形结构是一个抽象概念。
缓冲区是有大小限制, 默认是 100MB. 当 Mapper 的输出结果很多时, 就可能会撑爆内存, 所以需要在一定条件下将缓冲区中的数据临时写入磁盘, 然后重新利用这块缓冲区. 这个从内存往磁盘写数据的过程被称为 Spill, 也称为溢写. 这个溢写是由单独线程来完成,整个缓冲区有个溢写的比例 spill.percent. 这个比例默认是 0.8, 也就是当缓冲区的数据已经达到阈值 buffer size * spill percent = 100MB * 0.8 = 80MB, 溢写线程启动, 锁定这 80MB 的内存, 执行溢写过程. Mapper 的输出结果还可以往剩下的 20MB 内存中写, 互不影响

        6. Spill阶段即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。当溢写线程启动后, 将数据写入本地磁盘之前,需要对这 80MB 空间内的 Key 做排序 (Sort). 排序是 MapReduce 模型默认的行为, 这里的排序也是对序列化的字节做的排序。

如果 Job 设置过 Combiner, 会将有相同 Key 的 <key, value> 对的 Value 合并在起来, 减少溢写到磁盘的数据量。 Combiner 会优化 MapReduce 的中间结果, Combiner 的输出是 Reducer 的输入, Combiner 绝不能改变最终的计算结果。 Combiner 只应该用于那种 Reduce 的输入 <key, value> 与输出 <key, value> 类型完全一致, 且不影响最终结果的场景. 比如累加, 最大值等。

        7. Merge阶段 : 每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner),如果 Mapper 的输出结果真的很大, 有多次这样的溢写发生, 磁盘上相应的就会有多个临时文件存在. 当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并, 因为最终的文件只有一个, 写入磁盘, 并且为这个文件提供了一个索引文件, 以记录每个reduce对应数据的偏移量

【mapTask的一些基础设置配置】

配置

默认值

解释

mapreduce.task.io.sort.mb

100

设置环型缓冲区的内存值大小

mapreduce.map.sort.spill.percent

0.8

设置溢写的比例

mapreduce.cluster.local.dir

${hadoop.tmp.dir}/mapred/local

溢写数据目录

mapreduce.task.io.sort.factor

10

设置一次合并多少个溢写文件

3.2 ReduceTask 工作机制


简单概述:Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了,开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理。

详细步骤

  1. Copy阶段拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求MapTask获取属于自己的文件。
  2. Merge阶段在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
  3. Sort阶段:把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
  4. Reduce阶段:键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

3.3 Shuffle 过程


map 阶段处理的数据如何传递给 reduce 阶段,是 MapReduce 框架中最关键的一个流程,这个流程就叫 shuffle。

shuffle: 洗牌、发牌 ——(核心机制:数据分区,排序,分组,规约,合并等过程)

shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle。

  1. Collect阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。
  2. Spill阶段当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。
  3. Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只产生一个中间数据文件。
  4. Copy阶段:ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
  5. Merge阶段:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
  6. Sort阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M

 参考文档:

c​大数据计算引擎MapReduce框架详解 | 大数据技术分享

MapReduce的shuffle过程详解(分片、分区、合并、归并。。。)_mapreduce的shuffle流程_ASN_forever的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/180491.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux 系统编程,Binder 学习,文件访问相关的接口

文章目录 Linux 系统编程&#xff0c;Binder 学习&#xff0c;文件访问相关的接口1.概念2.linux文件结构3.文件描述符4.Linux文件系统的两类常用接口&#xff0c;linux系统内置库函数4.1 open4.2 close4.3 read4.4 write 5.标准I/O库函数5.1 fopen Linux 系统编程&#xff0c;B…

CCF ChinaSoft 2023 论坛巡礼|自动驾驶仿真测试论坛

2023年CCF中国软件大会&#xff08;CCF ChinaSoft 2023&#xff09;由CCF主办&#xff0c;CCF系统软件专委会、形式化方法专委会、软件工程专委会以及复旦大学联合承办&#xff0c;将于2023年12月1-3日在上海国际会议中心举行。 本次大会主题是“智能化软件创新推动数字经济与社…

C语言:简单的用二维数组打印杨氏三角

杨辉三角&#xff0c;又称帕斯卡三角&#xff0c;是一个数学上的规律图形。它的构造规则如下&#xff1a; 每一行的两个端点数字是1。从第三行开始&#xff0c;每个数字是它上方两个数字的和。每一行数字左右对称。 #include<stdio.h> int main() {int arr[50][50];//定…

深度学习之基于Pytorch和OCR的识别文本检测系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介深度学习与OCRPyTorch在OCR中的应用文本检测系统的关键组成部分1. 图像预处理2. 深度学习模型3. 文本检测算法4. 后处理 二、功能三、系统四. 总结 一项目简…

【机器学习7】优化算法

1 有监督学习的损失函数 1.1 分类问题 对二分类问题&#xff0c; Y{1,−1}&#xff0c; 我们希望sign f(xi,θ)yi&#xff0c; 最自然的损失函数是0-1损失&#xff0c; 函数定义特点0-1损失函数非凸、非光滑&#xff0c;很难直接对该函数进行优化Hinge损失函数当fy≥1时&…

MySQL(16):变量、流程控制与游标

变量 在MySQL数据库的存储过程和函数中&#xff0c;可以使用变量来存储查询或计算的中间结果数据&#xff0c;或者输出最终的结果数据。 在 MySQL 数据库中&#xff0c;变量分为 系统变量 以及 用户自定义变量 。 系统变量 变量由系统定义&#xff0c;不是用户定义&#xff…

RabbitMQ-高级篇-黑马程序员

代码&#xff1a; 链接&#xff1a; https://pan.baidu.com/s/1nQBIgB_SbzoKu_XMWZ3JoA?pwdaeoe 提取码&#xff1a;aeoe 在昨天的练习作业中&#xff0c;我们改造了余额支付功能&#xff0c;在支付成功后利用RabbitMQ通知交易服务&#xff0c;更新业务订单状态为已支付。 但…

二百零二、Hive——Hive解析JSON字段(单个字段与json数组)

一、目的 用Flume采集Kafka写入到Hive的ODS层在HDFS路径下的JSON数据&#xff0c;需要在DWD层进行解析并清洗 &#xff08;一&#xff09;Hive的ODS层建静态分区外部表 create external table if not exists ods_queue(queue_json string ) comment 静态排队数据表——静…

图像分类:弥合像素和理解之间的差距

一、介绍 在人工智能的广阔领域中&#xff0c;图像分类作为一种关键应用脱颖而出&#xff0c;它无缝地融合了计算机视觉和机器学习的复杂性。图像分类的核心是训练机器对数字图像中的对象或场景进行识别和分类。这项技术有着广泛的应用&#xff0c;从自动驾驶汽车和医疗诊断到社…

鸿蒙:从0到“Hello Harmony”

效果展示 一.概述 明年华为鸿蒙就不再兼容Android生态了&#xff0c;作为拥有7亿终端用户的华为&#xff0c;建立自己的生态也是理所当然。 所以对HarmonyOS的研究也是众多开发者绕不开的坎了。 今天这篇博文主要实现一个“Hello Harmony&#xff01;”的Demo。 二.官方链接…

算法-二分查找-简单-搜索插入位置

记录一下算法题的学习3 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 示例 1: 输入: nums [1,3,5,6], target 5 输出: 2示例 2: 输入: nums [1,3,5,6], …

电磁场与电磁波part1--矢量分析

目录 1、方向导数 2、散度定理&#xff08;高斯定理&#xff09; 3、散度与旋度的比较 4、旋度定理&#xff08;斯托克斯定理&#xff09; 5、关于点乘、叉乘、梯度、散度、旋度的计算 ~~~~~~~~~~~~~~~~~~~~~~~~ 确认过眼神&#xff0c;是我看不懂的 ~~~~~~~~~~~~~~~~…