MR源码解析
- new Job(): 读取本地文件, xml配置
- job.start(): 启动线程
- job的run():线程方法
- runTasks(): 传入对应的接口,启动map或者reduce
- MapTask类的run(): 设置map阶段的参数,初始化任务,创建上下文对象
- 创建读取器LineRecordReader
- 判断是否压缩 compressFactory
- 如果没有压缩,使用seek方法
- mapTask的write(),进行溢写
- mapper类的init()方法,设置溢写百分比和缓冲区大小
- collector收集器:进行map阶段数据类型检查和分数数量检查
- keySerializer: 进行数据的序列化,调用自己写的bean对象
- kvmeta.put(): 写入环形缓冲区
- mapPhase结束
- 数据量达到缓冲区的80%,对索引进行快速排序
- input.close():关闭输入
- 关闭输出并同时将缓冲区数据按照分区写入磁盘。
- 如果开启了combine,进行数据合并
- mergePart:归并分区
- combine第二次合并,如果溢写次数小于3就不合并了
- collector.close():关闭环形缓冲区
- reduceTask的run方法
- submit: 5个reduce并行提交
- cLeanTask:初始化
- shuffle类:map的排序,recuce中的归并排序
- Merger合并器:两次归并排序,先内存归并,后磁盘归并
- 抓取数据:可以从本地或者网络中抓取
- sort :归并排序
- reduce阶段:
- 创建上下文对象
- 调用reducer的run方法
- real.write(): LineRecordWrite写入HDFS
使用MR来进行拷贝去重
- 拷贝:values写入上下文时需要迭代遍历
- 去重:values写入上下文时不遍历
使用MR来实现join操作
- 实现TableBean类,四个属性,空参构造器,get-set方法
- write():序列化
- out.writeUTF():该方法有换行,不会连在一起
- readFields(): 反序列化
- write():序列化
- 实现mapper类
- setup()
- 使用context上下文对象获取InputSplit类
- 强制类型转换为FileSplit类
- getPath().getName()获取文件名称
- map()
- 切分split
- 封装
- context写出
- setup()
public class TableMapper extends Mapper<LongWritable, Text, Text,TableBean> {private String filename;private Text outK;private TableBean outV;//初始化,每个文件开始一次maptask,并进行一次初始化//获取到文件的名称@Overrideprotected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//拿到切片信息FileSplit split = (FileSplit) context.getInputSplit();filename = split.getPath().getName();outK = new Text();outV = new TableBean();}@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {//1. 获取一行String line = value.toString();//2.判断是哪个文件的if(filename.contains("order")){//处理的是订单表String[] split = line.split("\t");//封装outK.set(split[1]);//pid作为keyoutV.setId(split[0]);outV.setPid(split[1]);outV.setAmount(Integer.parseInt(split[2]));outV.setTableName("order");outV.setPname("");}else{//处理的是商品表String[] split = line.split(" ");
// System.out.println("=========> " + Arrays.toString(split)+" <=========");
// System.out.println("=========> " + split[1] +" <=========");//封装outK.set(split[0]);//pid作为keyoutV.setId("");outV.setPid(split[0]);outV.setAmount(0);outV.setTableName("pd");outV.setPname(split[1]);}//写出context.write(outK, outV);}
}
- 实现reduce类
- 为了分辨map传递过来的数据是哪个表,给bean对象添加一个表名属性
- 在mapper类中给对应表的抓取过程中添加标记
- 在获取到value时不能直接使用等于号进行赋值,values是Iterable集合,比较特殊
- 属性赋值工具类
BeanUtils.copyProperties(dest, src);
public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {private ArrayList<TableBean> orderBeans;private TableBean pdBean;@Overrideprotected void setup(Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {//1.创建集合orderBeans = new ArrayList<>();pdBean = new TableBean();}@Overrideprotected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {orderBeans.clear();//清空集合//2.遍历赋值for (TableBean value : values) {if ("order".equals(value.getTableName())) {TableBean temp = new TableBean();try {BeanUtils.copyProperties(temp,value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}orderBeans.add(temp);} else {//商品表try {BeanUtils.copyProperties(pdBean, value);} catch (IllegalAccessException e) {throw new RuntimeException(e);} catch (InvocationTargetException e) {throw new RuntimeException(e);}}}//循环遍历orderBeans,赋值pdnamefor (TableBean orderBean : orderBeans) {orderBean.setPname(pdBean.getPname());context.write(orderBean,NullWritable.get());}}
}
总结:这种写法,在reduce阶段创建了对象和集合,这些方式都是比较消耗资源的,容易造成数据倾斜问题。
MR在环形缓冲区快排时倒排索引,反向溢写,会导致数据反向输出,类似栈结构的的先进后出。