【MapReduce】对员工数据按照部门分区并对每个分区排序

        员工信息全部存储在emp.csv文件中,员工的属性有:员工id、名称、职位、领导id、雇佣时间、工资、奖金、部门号。

        在MapReduce中想要使用员工的信息,需要对员工进行序列化处理。因为MapReduce是一个分布式框架数据会在不同节点之间进行传输,所以需要将对象转换成字节序列以便于存储或传输。并且如果对象不序列化程序会出错。

一、主类

主类作用:在主类中设置MapReduce中的map类和reduce类,指定分区规则类、设置启动reduce的数量,设置map阶段和reduce阶段的输入输出类型。上传文件。


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class EmployeeMain {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);//设置主类job.setJarByClass(EmployeeMain.class);//设置Map类job.setMapperClass(EmployeeMapper.class);//设置Reduce类job.setReducerClass(SalaryTotalReducer.class);//指定分区规则job.setPartitionerClass(DeptnoPartitioner.class);//设置启动reduce数量job.setNumReduceTasks(3);job.setMapOutputKeyClass(IntWritable.class);// map阶段的输出的keyjob.setMapOutputValueClass(Employee.class);// map阶段的输出的valuejob.setOutputKeyClass(IntWritable.class);// reduce阶段的输出的keyjob.setOutputValueClass(Employee.class);// reduce阶段的输出的value//Windows本地路径FileInputFormat.setInputPaths(job, new Path("./src/main/java/serialSortPartitioner/emp.csv"));FileOutputFormat.setOutputPath(job, new Path("./src/main/java/serialSortPartitioner/output"));System.out.println("计算开始---------------");boolean res = job.waitForCompletion(true);System.out.println("计算结束---------------");}}

二、员工类


import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class Employee implements Writable{//员工idprivate int empno;//员工名称private String ename;//员工职位private String job;//直接领导的员工idprivate int mgr;//雇佣时间private String hiredate;//工资private int sal;//奖金private int comm;//部门号private int deptno;public Employee(){}//序列化public void write(DataOutput out) throws IOException {out.writeInt(this.empno);out.writeUTF(this.ename);out.writeUTF(this.job);out.writeInt(this.mgr);out.writeUTF(this.hiredate);out.writeInt(this.sal);out.writeInt(this.comm);out.writeInt(this.deptno);}//反序列化public void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.job = in.readUTF();this.mgr = in.readInt();this.hiredate = in.readUTF();this.sal = in.readInt();this.comm = in.readInt();this.deptno = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public String getJob() {return job;}public void setJob(String job) {this.job = job;}public int getMgr() {return mgr;}public void setMgr(int mgr) {this.mgr = mgr;}public String getHiredate() {return hiredate;}public void setHiredate(String hiredate) {this.hiredate = hiredate;}public Integer getSal() {return sal;}public void setSal(int sal) {this.sal = sal;}public int getComm() {return comm;}public void setComm(int comm) {this.comm = comm;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}@Overridepublic String toString() {return "Employee{" +"empno=" + empno +", ename='" + ename + '\'' +", job='" + job + '\'' +", mgr=" + mgr +", hiredate='" + hiredate + '\'' +", sal=" + sal +", comm=" + comm +", deptno=" + deptno +'}';}
}

三、map类

map类主要作用是输入员工的数据到MapReduce中。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;public class EmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {//get values stringString v1string = v1.toString();//spile stringString words[] = v1string.split(",");//map out key/value//System.out.println("display this turn <key,1> ");Employee e = new Employee();//员工号e.setEmpno(Integer.parseInt(words[0]));//姓名e.setEname(words[1]);//职位e.setJob(words[2]);//老板号try {e.setMgr(Integer.parseInt(words[3]));} catch (Exception e1) {//没有老板号e.setMgr(-1);}//入职日期e.setHiredate(words[4]);//工资e.setSal(Integer.parseInt(words[5]));//奖金try {e.setComm(Integer.parseInt(words[6]));} catch (Exception e2) {e.setComm(0);}//部门号e.setDeptno(Integer.parseInt(words[7]));
//        System.out.println("map   " + e.toString());//根据部门号作为关键字,进行默认排序,也可以设置为空context.write(new IntWritable(e.getDeptno()), e);}@Overridepublic void run(Context context) throws IOException, InterruptedException {super.run(context);}
}

三、分区类

根据部门号进行分区


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class DeptnoPartitioner extends Partitioner<IntWritable, Employee> {//根据部门号设置分区@Overridepublic int getPartition(IntWritable k2, Employee v2, int numPartitions) {// TODO Auto-generated method stubif (v2.getDeptno() <= 10) {return 0;} else if (v2.getDeptno() <= 20) {return 1;} else return 2;}
}

四、reduce类

设置的分区数量和启动的reduce数量相同(在主类中设置启动数量),在reduce类中进行排序就可以实现每个分区进行自定义排序。


import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.*;public class SalaryTotalReducer extends Reducer<IntWritable, Employee, NullWritable, Employee> {Comparator comparetor = new compareclass();//TreeMap对象可以使用降序的比较器private TreeMap<Integer, Employee> repToRecordMap =new TreeMap<Integer, Employee>(comparetor); //如果参数为空是默认升序比较器protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context)throws IOException, InterruptedException {//在这里自定义排序for (Employee e : v3) {repToRecordMap.put(e.getSal(),e);}//在这里获取排序后的结果for (Integer e : repToRecordMap.keySet()) {//在这里工资数据会改变(原因未知),需要重新设置为原来的工资repToRecordMap.get(e).setSal(e);context.write(NullWritable.get(),repToRecordMap.get(e));}}
}class compareclass implements Comparator<Integer> {//返回一个基本类型的整型,谁大谁排后面(升序).//返回负数表示:o1 小于o2//返回0表示:表示:o1和o2相等//返回正数表示:o1大于o2。//默认用o1-o2,创建TreeMap对象时可以不用这个继承类,但是要降序,必须修改compare里面的逻辑o2-o1//谁大谁排在前面(降序)用o2-o1@Override//排序public int compare(Integer o1, Integer o2) {// TODO Auto-generated method stubreturn o1 - o2;}
}

运行结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/409829.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ZooKeeper 实战(五) Curator实现分布式锁

文章目录 ZooKeeper 实战(五) Curator实现分布式锁1.简介1.1.分布式锁概念1.2.Curator 分布式锁的实现方式1.3.分布式锁接口 2.准备工作3.分布式可重入锁3.1.锁对象3.2.非重入式抢占锁测试代码输出日志 3.3.重入式抢占锁测试代码输出日志 4.分布式非可重入锁4.1.锁对象4.2.重入…

R语言【paleobioDB】——pbdb_orig_ext():绘制随着时间变化而出现的新类群

Package paleobioDB version 0.7.0 paleobioDB 包在2020年已经停止更新&#xff0c;该包依赖PBDB v1 API。 可以选择在Index of /src/contrib/Archive/paleobioDB (r-project.org)下载安装包后&#xff0c;执行本地安装。 Usage pbdb_orig_ext (data, rank, temporal_extent…

Spark---累加器和广播变量

文章目录 1.累加器实现原理2.自定义累加器3.广播变量 1.累加器实现原理 累加器用来把 Executor 端变量信息聚合到 Driver 端。在 Driver 程序中定义的变量&#xff0c;在Executor 端的每个 Task 都会得到这个变量的一份新的副本&#xff0c;每个 task 更新这些副本的值后&…

【windows】右键添加git bash here菜单

在vs 里安装了git for windows 后&#xff0c;之前git-bash 右键菜单消失了。难道是git for windows 覆盖了原来自己安装的git &#xff1f;大神给出解决方案 手动添加Git Bash Here到右键菜单&#xff08;超详细&#xff09; 安装路径&#xff1a;我老的 &#xff1f; vs的gi…

Spring5深入浅出篇:Spring工厂设计模式拓展应用

Spring5深入浅出篇:Spring工厂设计模式拓展应用 简单工厂实现 这里直接上代码举例子 UserService.java public interface UserService {public void register(User user);public void login(String name, String password); }UserServiceImpl.java public class UserService…

Netty-Netty源码分析

Netty线程模型图 Netty线程模型源码剖析图 Netty高并发高性能架构设计精髓 主从Reactor线程模型NIO多路复用非阻塞无锁串行化设计思想支持高性能序列化协议零拷贝(直接内存的使用)ByteBuf内存池设计灵活的TCP参数配置能力并发优化 无锁串行化设计思想 在大多数场景下&#…

【计算机网络】网络层——详解IP协议

个人主页&#xff1a;兜里有颗棉花糖 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 兜里有颗棉花糖 原创 收录于专栏【网络编程】 本专栏旨在分享学习计算机网络的一点学习心得&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 &#x1f431;一、I…

【MATLAB源码-第113期】基于matlab的孔雀优化算法(POA)机器人栅格路径规划,输出做短路径图和适应度曲线。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 POA&#xff08;孔雀优化算法&#xff09;是一种基于孔雀羽毛开屏行为启发的优化算法。这种算法模仿孔雀通过展开其色彩斑斓的尾羽来吸引雌性的自然行为。在算法中&#xff0c;每个孔雀代表一个潜在的解决方案&#xff0c;而…

linux驱动(六):input(key)

本文主要探讨210的input子系统。 input子系统 input子系统包含:设备驱动层,输入核心层,事件驱动层 事件处理层&#xff1a;接收核心层上报事件选择对应struct input_handler处理,每个input_handler对象处理一类事件,同类事件的设备驱动共用同一handler …

TCP连接TIME_WAIT

TCP断开过程: TIME_WAIT的作用: TIME_WAIT状态存在的理由&#xff1a; 1&#xff09;可靠地实现TCP全双工连接的终止 在进行关闭连接四次挥手协议时&#xff0c;最后的ACK是由主动关闭端发出的&#xff0c;如果这个最终的ACK丢失&#xff0c;服务器将重发最终的FIN&#xf…

Unity与Android交互通信系列(4)

上篇文章我们实现了模块化调用&#xff0c;运用了模块化设计思想和简化了调用流程&#xff0c;本篇文章讲述UnityPlayerActivity类的继承和使用。 在一些深度交互场合&#xff0c;比如Activity切换、程序启动预处理等&#xff0c;这时可能会需要继承Application和UnityPlayerAc…

【目标检测】YOLOv7算法实现(一):模型搭建

本系列文章记录本人硕士阶段YOLO系列目标检测算法自学及其代码实现的过程。其中算法具体实现借鉴于ultralytics YOLO源码Github&#xff0c;删减了源码中部分内容&#xff0c;满足个人科研需求。   本篇文章在YOLOv5算法实现的基础上&#xff0c;进一步完成YOLOv7算法的实现。…