员工信息全部存储在emp.csv文件中,员工的属性有:员工id、名称、职位、领导id、雇佣时间、工资、奖金、部门号。
在MapReduce中想要使用员工的信息,需要对员工进行序列化处理。因为MapReduce是一个分布式框架数据会在不同节点之间进行传输,所以需要将对象转换成字节序列以便于存储或传输。并且如果对象不序列化程序会出错。
一、主类
主类作用:在主类中设置MapReduce中的map类和reduce类,指定分区规则类、设置启动reduce的数量,设置map阶段和reduce阶段的输入输出类型。上传文件。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class EmployeeMain {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置主类job.setJarByClass(EmployeeMain.class);//设置Map类job.setMapperClass(EmployeeMapper.class);//设置Reduce类job.setReducerClass(SalaryTotalReducer.class);//指定分区规则job.setPartitionerClass(DeptnoPartitioner.class);//设置启动reduce数量job.setNumReduceTasks(3);job.setMapOutputKeyClass(IntWritable.class);// map阶段的输出的keyjob.setMapOutputValueClass(Employee.class);// map阶段的输出的valuejob.setOutputKeyClass(IntWritable.class);// reduce阶段的输出的keyjob.setOutputValueClass(Employee.class);// reduce阶段的输出的value//Windows本地路径FileInputFormat.setInputPaths(job, new Path("./src/main/java/serialSortPartitioner/emp.csv"));FileOutputFormat.setOutputPath(job, new Path("./src/main/java/serialSortPartitioner/output"));System.out.println("计算开始---------------");boolean res = job.waitForCompletion(true);System.out.println("计算结束---------------");}}
二、员工类
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements Writable{//员工idprivate int empno;//员工名称private String ename;//员工职位private String job;//直接领导的员工idprivate int mgr;//雇佣时间private String hiredate;//工资private int sal;//奖金private int comm;//部门号private int deptno;public Employee(){}//序列化public void write(DataOutput out) throws IOException {out.writeInt(this.empno);out.writeUTF(this.ename);out.writeUTF(this.job);out.writeInt(this.mgr);out.writeUTF(this.hiredate);out.writeInt(this.sal);out.writeInt(this.comm);out.writeInt(this.deptno);}//反序列化public void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.job = in.readUTF();this.mgr = in.readInt();this.hiredate = in.readUTF();this.sal = in.readInt();this.comm = in.readInt();this.deptno = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public String getJob() {return job;}public void setJob(String job) {this.job = job;}public int getMgr() {return mgr;}public void setMgr(int mgr) {this.mgr = mgr;}public String getHiredate() {return hiredate;}public void setHiredate(String hiredate) {this.hiredate = hiredate;}public Integer getSal() {return sal;}public void setSal(int sal) {this.sal = sal;}public int getComm() {return comm;}public void setComm(int comm) {this.comm = comm;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}@Overridepublic String toString() {return "Employee{" +"empno=" + empno +", ename='" + ename + '\'' +", job='" + job + '\'' +", mgr=" + mgr +", hiredate='" + hiredate + '\'' +", sal=" + sal +", comm=" + comm +", deptno=" + deptno +'}';}
}
三、map类
map类主要作用是输入员工的数据到MapReduce中。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {//get values stringString v1string = v1.toString();//spile stringString words[] = v1string.split(",");//map out key/value//System.out.println("display this turn <key,1> ");Employee e = new Employee();//员工号e.setEmpno(Integer.parseInt(words[0]));//姓名e.setEname(words[1]);//职位e.setJob(words[2]);//老板号try {e.setMgr(Integer.parseInt(words[3]));} catch (Exception e1) {//没有老板号e.setMgr(-1);}//入职日期e.setHiredate(words[4]);//工资e.setSal(Integer.parseInt(words[5]));//奖金try {e.setComm(Integer.parseInt(words[6]));} catch (Exception e2) {e.setComm(0);}//部门号e.setDeptno(Integer.parseInt(words[7]));
// System.out.println("map " + e.toString());//根据部门号作为关键字,进行默认排序,也可以设置为空context.write(new IntWritable(e.getDeptno()), e);}@Overridepublic void run(Context context) throws IOException, InterruptedException {super.run(context);}
}
三、分区类
根据部门号进行分区
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class DeptnoPartitioner extends Partitioner<IntWritable, Employee> {//根据部门号设置分区@Overridepublic int getPartition(IntWritable k2, Employee v2, int numPartitions) {// TODO Auto-generated method stubif (v2.getDeptno() <= 10) {return 0;} else if (v2.getDeptno() <= 20) {return 1;} else return 2;}
}
四、reduce类
设置的分区数量和启动的reduce数量相同(在主类中设置启动数量),在reduce类中进行排序就可以实现每个分区进行自定义排序。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;public class SalaryTotalReducer extends Reducer<IntWritable, Employee, NullWritable, Employee> {Comparator comparetor = new compareclass();//TreeMap对象可以使用降序的比较器private TreeMap<Integer, Employee> repToRecordMap =new TreeMap<Integer, Employee>(comparetor); //如果参数为空是默认升序比较器protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)throws IOException, InterruptedException {//在这里自定义排序for (Employee e : v3) {repToRecordMap.put(e.getSal(),e);}//在这里获取排序后的结果for (Integer e : repToRecordMap.keySet()) {//在这里工资数据会改变(原因未知),需要重新设置为原来的工资repToRecordMap.get(e).setSal(e);context.write(NullWritable.get(),repToRecordMap.get(e));}}
}class compareclass implements Comparator<Integer> {//返回一个基本类型的整型,谁大谁排后面(升序).//返回负数表示:o1 小于o2//返回0表示:表示:o1和o2相等//返回正数表示:o1大于o2。//默认用o1-o2,创建TreeMap对象时可以不用这个继承类,但是要降序,必须修改compare里面的逻辑o2-o1//谁大谁排在前面(降序)用o2-o1@Override//排序public int compare(Integer o1, Integer o2) {// TODO Auto-generated method stubreturn o1 - o2;}
}