MapReduce解析:从定义到核心思想,编程规范与序列化解读

目录

  • 一、 MapReduce
    • 1.1 MapReduce定义
    • 1.2 MapReduce优缺点
      • 1.2.1 优点
      • 1.2.2 缺点
    • 1.3 MapReduce核心思想
    • 1.4 MapReduce进程
    • 1.5 常用数据序列化类型
    • 1.6 MapReduce编程规范
      • 1.6.1Mapper阶段
      • 1.6.2 Reduce阶段
      • 1.6.3 Driver阶段
    • 1.7 WordCount案例实操
      • 1.7.1 本地测试
      • 1.7.2 提交到集群测试
  • 二、 Hadoop序列化
    • 2.1 序列化概述
    • 2.2 自定义bean对象实现序列化接口(Writable)
    • 2.3 序列化案例实操

一、 MapReduce

1.1 MapReduce定义

MapReduce是一种用于处理大规模数据集的编程模型和分布式运算程序的编程框架。它最初由Google公司开发,在后来成为了Apache Hadoop项目的核心组件之一。MapReduce的核心思想是将一个大的计算任务分解为多个可以并行执行的小任务,并通过将数据并行处理来实现高效的大规模数据处理。它适用于分布式环境下的数据处理,可以在大规模集群上并行执行计算任务,从而提高处理速度和可扩展性。

1.2 MapReduce优缺点

1.2.1 优点

  • 简化编程模型
    MapReduce提供了简单的编程接口,开发人员只需实现map和reduce函数即可,无需关注底层的并行和分布式细节。只需简单的实现一些接口,就可以完成一个分布式程序。
  • 高可扩展性
    MapReduce允许将大规模数据集分布在集群中进行处理,从而实现了高度可扩展性。可以通过增加集群中的节点数量来增加处理能力。
  • 高容错性
    MapReduce具有容错机制,当某个节点发生故障时,可以自动重新分配任务到其他可用节点上,保证任务的完成。
  • 适用于大规模数据集
    MapReduce适用于处理大规模的数据集,可以在分布式环境下高效地处理海量数据。

1.2.2 缺点

  • 高延迟
    由于MapReduce是批处理模型,需要等待所有任务完成后才能得到结果,因此对于实时性要求较高的场景不太适用。
  • 适用性限制
    MapReduce适用于批处理任务,但不适用于需要实时响应和交互式查询的场景。它主要适用于离线数据分析、批量处理和一次性计算等应用。
  • 数据倾斜
    在某些情况下,由于数据分布的不均匀,输入的数据可能会导致Reduce任务之间的负载不均衡,一些任务可能会比其他任务运行更长时间,从而影响整体性能。
  • 不擅长DAG(有向无环图)计算
    多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

1.3 MapReduce核心思想

MapReduce的核心思想是将一个大的计算任务分解为多个可以并行执行的小任务,并通过将数据并行处理来实现高效的大规模数据处理。
具体而言,MapReduce模型包含两个主要的阶段:Map阶段和Reduce阶段。

在这里插入图片描述

在Map阶段,原始数据集会被划分成多个拆分的数据块,每个数据块会被分配给一个Map任务进行处理。Map任务接收输入数据,并将其转化为一系列中间键值对(key-value pairs)。

在Reduce阶段,Map任务输出的中间键值对会按照键进行排序和分区,并将相同键的键值对发送到同一个Reduce任务。Reduce任务负责对属于自己的中间键值对进行聚合并生成最终结果。

通过这种方式,MapReduce利用数据的并行处理能力,实现了高效的大规模数据处理和分析。每个小任务(Map任务)可以独立地处理自己分配到的数据块,而Reduce任务可以并行地对各个Map任务的输出进行聚合,从而加速整个计算过程。

注意:
1)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
2)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
3)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

1.4 MapReduce进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责Map阶段的整个数据处理流程。
(3)ReduceTask:负责Reduce阶段的整个数据处理流程。

1.5 常用数据序列化类型

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

1.6 MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver。

1.6.1Mapper阶段

WordCount官方代码截图:
在这里插入图片描述

  • 用户自定义的Mapper要继承自己的父类Mapper
  • Mapper的输入输出是KV键值对的形式(KV的类型可自定义)
  • Mapper中的业务逻辑是写在map()方法中
  • Mapper的输出数据是KV键值对的形式(KV的类型可自定义)
  • map()方法对每一个<K,V>调用一次

1.6.2 Reduce阶段

WordCount官方代码截图:
在这里插入图片描述

  • 用户自定义的Reducer要继承自己的父类
  • Reducer的输入数据类型对应Mapper的输出数据类型
  • Reducer的业务逻辑写在reduce()方法中
  • ReduceTask进程对每一组相K的<K,V>组调用一次reduce()方法

1.6.3 Driver阶段

WordCount官方代码截图:在这里插入图片描述
相当于Yarn集群客户端,用于提交我们整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。

1.7 WordCount案例实操

1.7.1 本地测试

(1)创建maven工程,并在pom.xml文件中添加如下依赖

<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency>
</dependencies>

(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(3)编写Mapper类

package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{Text k = new Text();IntWritable v = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException {// 1 获取一行String line = value.toString();// 2 切割String[] words = line.split(" ");// 3 输出for (String word : words) {k.set(word);context.write(k, v);}}
}

(2)编写Reducer类

package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{int sum;
IntWritable v = new IntWritable();@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {// 1 累加求和sum = 0;for (IntWritable count : values) {sum += count.get();}// 2 输出v.set(sum);context.write(key,v);}
}

(3)编写Driver驱动类

package com.amxl.mapreduce.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取配置信息以及获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2 关联本Driver程序的jarjob.setJarByClass(WordCountDriver.class);// 3 关联Mapper和Reducer的jarjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 4 设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 5 设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 6 设置输入和输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

(4)本地测试

  • 需要首先配置好HADOOP_HOME变量以及Windows运行依赖
  • 在IDEA/Eclipse上运行程序

1.7.2 提交到集群测试

集群上测试
(1)用maven打jar包,需要添加的打包插件依赖

<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

(2)将程序打成jar包
(3)修改不带依赖的jar包名称为wc.jar,并拷贝该jar包到Hadoop集群的/opt/module/hadoop-3.2.4路径。
(4)启动Hadoop集群

[amo@hadoop102 hadoop-3.2.4]sbin/start-dfs.sh
[amo@hadoop103 hadoop-3.2.4]$ sbin/start-yarn.sh

(5)执行WordCount程序

[amo@hadoop102 hadoop-3.2.4]$ hadoop jar  wc.jarcom.amxl.mapreduce.wordcount.WordCountDriver /user/amo/input /user/amo/output

二、 Hadoop序列化

2.1 序列化概述

1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
3)为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
4)Hadoop序列化特点:

  • 紧凑 :高效使用存储空间。
  • 快速:读写数据的额外开销小。
  • 互操作:支持多语言的交互

2.2 自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
具体实现bean对象序列化步骤如下7步。
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造

public FlowBean() {super();
}

(3)重写序列化方法

@Override
public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);
}

(4)重写反序列化方法

@Override
public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();
}

(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
(7)如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。详见后面排序案例。

@Override
public int compareTo(FlowBean o) {// 倒序排列,从大到小return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

2.3 序列化案例实操

1)编写MapReduce程序
(1)编写流量统计的Bean对象

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;//1 继承Writable接口
public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//2 提供无参构造public FlowBean() {}//3 提供三个参数的getter和setter方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//4 实现序列化和反序列化方法,注意顺序一定要保持一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//5 重写ToString@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}
}

(2)编写Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text outK = new Text();private FlowBean outV = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//1 获取一行数据,转成字符串String line = value.toString();//2 切割数据String[] split = line.split("\t");//3 抓取我们需要的数据:手机号,上行流量,下行流量String phone = split[1];String up = split[split.length - 3];String down = split[split.length - 2];//4 封装outK outVoutK.set(phone);outV.setUpFlow(Long.parseLong(up));outV.setDownFlow(Long.parseLong(down));outV.setSumFlow();//5 写出outK outVcontext.write(outK, outV);}
}

(3)编写Reducer类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean outV = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long totalUp = 0;long totalDown = 0;//1 遍历values,将其中的上行流量,下行流量分别累加for (FlowBean flowBean : values) {totalUp += flowBean.getUpFlow();totalDown += flowBean.getDownFlow();}//2 封装outKVoutV.setUpFlow(totalUp);outV.setDownFlow(totalDown);outV.setSumFlow();//3 写出outK outVcontext.write(key,outV);}
}

(4)编写Driver驱动类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(FlowDriver.class);//3 关联Mapper和Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//6 设置程序的输入输出路径FileInputFormat.setInputPaths(job, new Path("D:\\inputflow"));FileOutputFormat.setOutputPath(job, new Path("D:\\flowoutput"));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/539684.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文阅读——RingMo

RingMo: A Remote Sensing Foundation Model With Masked Image Modeling 与自然场景相比&#xff0c;RS图像存在以下困难。 1&#xff09;分辨率和方位范围大&#xff1a;受遥感传感器的影响&#xff0c;图像具有多种空间分辨率。此外&#xff0c;与自然图像的实例通常由于重…

接上一篇:分布式调用链追踪系统设计

所以必须得记录父子关系&#xff1a; A---->B 是 B---->C 的父调用 A---->D 是 D---->E 的父调用 A---->D 还是 D---->F 的父调用 如何记录呢&#xff1f;需要给每个调用分配一个ID (称为 SpanID)&#xff0c;并且把这个 ID 传递给子调用&#xff0c; 子…

2024.3.14jsp

一、实验目的 1、安装配置JSP运行环境 2、设置web服务目录&#xff0c;修改TomCAT服务器的端口号、访问web服务目录下的jsp页面。 二、实验项目内容&#xff08;实验题目&#xff09; 1、编写两个简单的JSP页面&#xff1b;参考第一章上机实验1、2 &#xff08;1&#xff09…

html--bug

文章目录 html html <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>老师</title><style>body {background-color: #008000;margin: 0px;cursor: none;overflow: hidden;}</style></head><bod…

Jmeter+ant,ant安装与配置

1.ant含义 ant&#xff1a;Ant翻译过来是蚂蚁的意思&#xff0c;在我们做接口测试的时候&#xff0c;是可以用来做JMeter接口测试生成测试报告的工具 2.ant下载 下载地址&#xff1a;Apache Ant - Ant Manual Distributions download中选择ant 下载安装最新版zip文件 3.…

JVM及垃圾回收算法

一、JVM 1、jvm的内存组成 五大内存区域&#xff0c;分1.7和1.8 1.堆内存&#xff1a;引用类型的数据&#xff0c;内部组成&#xff1a;1.新生代&#xff08;伊甸区和幸存者区&#xff09;2.老年代。该区域经常发生垃圾回收的操作 堆是JVM中最大的一块内存区域&#xff0c;用…

installation of package ‘RDocumentation’ had non-zero exit status

installation of package ‘RDocumentation’ had non-zero exit status Warning in install.packages :installation of package ‘httr’ had non-zero exit status Warning in install.packages :installation of package ‘openssl’ had non-zero exit status 由于项目需…

苹果Find My App用处多多,产品认准伦茨科技ST17H6x芯片

苹果发布AirTag发布以来&#xff0c;大家都更加注重物品的防丢&#xff0c;苹果的 Find My 就可以查找 iPhone、Mac、AirPods、Apple Watch&#xff0c;如今的Find My已经不单单可以查找苹果的设备&#xff0c;随着第三方设备的加入&#xff0c;将丰富Find My Network的版图。产…

sqllab第十九关通关笔记

知识点&#xff1a; 错误注入 最大长度为32位&#xff1b;如果目标长度>32时&#xff0c;需要利用截取函数进行分段读取referer注入 insert语句update语句 通过admin admin进行登录发现页面打印除了referer字段的信息 这应该是一个referer注入 首先进行测试一下 构造payl…

Python基于 opencv 的人脸识别考勤系统(V1.0),附源码

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12W、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

先验分布、后验分布、极大似然的一点思考

今天和组里同事聊天的时候&#xff0c;无意中提到了贝叶斯统计里先验分布、后验分布、以及极大似然估计这三个概念。同事专门研究如何利用条件概率做系统辨识的&#xff0c;给我画了一幅图印象非常深刻&#xff1a; 其中k表示时序关系。上面这个图表示后验分布是由先验分布与似…

flutter环境搭建实践

Dart Dart 是一种客户端和服务器端的编程语言&#xff0c;最早由 Google 提出。它被设计用于构建高性能、高度可伸缩和可靠的应用程序。Dart 可以编译成本地代码或者在虚拟机中直接运行。在移动应用开发中&#xff0c;Dart 主要用于开发 Flutter 应用。 Flutter 和 Dart 的关…