目录
- 一、前言
- 二、准备数据
- 三、编程实现
- 3.1、统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录
- 3.2、统计rank<3并且order>2的所有UID及数量
- 3.3、上午7-9点之间,搜索过“赶集网”的用户UID
- 3.4、通过Rank:点击排名 对数据进行排序
- 四、参考
一、前言
最近学习大数据的知识,需要做一些有关Hadoop MapReduce
的实验
实验内容是在sogou.500w.utf8
数据的基础上进行的。
实现以下内容:
- 1、统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录
- 2、统计rank<3并且order>2的所有UID及数量
- 3、上午7-9点之间,搜索过“赶集网”的用户UID
- 4、通过Rank:点击排名 对数据进行排序
该实验是在已经搭建好Hadoop集群的基础上进行的,如果还没有搭建,请参考以下文章进行集群搭建
二、准备数据
数据的字段说明
上传数据
创建目录
hdfs dfs -mkdir /homework
上传文件
hdfs dfs -put -p sogou.500w.utf8 /homework
三、编程实现
3.1、统计出搜索过包含有“仙剑奇侠传”内容的UID及搜索关键字记录
1、Mapper
package com.csust.homework1;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class TaskMapper1 extends Mapper<LongWritable, Text, Text, Text> {Text outputK = new Text();Text outputV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split("\t");if (words[2].contains("仙剑奇侠传")) {outputK.set(words[1]);outputV.set(words[2]);context.write(outputK, outputV);}}
}
2、Reduce
package com.csust.homework1;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TaskReducer1 extends Reducer<Text,Text,Text,Text> {Text outputV = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {StringBuilder total= new StringBuilder();for (Text value : values) {total.append(value.toString());total.append("\t");}outputV.set(total.toString());context.write(key,outputV);}
}
3、Driver
package com.csust.homework1;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TaskDriver1 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//1:创建一个job任务对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//如果打包运行出错,则需要加该配置job.setJarByClass(TaskDriver1.class);job.setMapperClass(TaskMapper1.class);job.setReducerClass(TaskReducer1.class);//设置Mapper输出job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//设置最终输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//设置路径FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework1_out"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
4、程序运行
5、运行结果
3.2、统计rank<3并且order>2的所有UID及数量
1、Mapper
package com.csust.homework2;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class TaskMapper2 extends Mapper<LongWritable, Text, Text, IntWritable> {Text outK = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {String data = value.toString();String[] words = data.split("\t");if (Integer.parseInt(words[3])<3 && Integer.parseInt(words[4])>2) {outK.set(words[1]); context.write(outK, new IntWritable(1)); }}
}
2、Reduce
package com.csust.homework2;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TaskReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int total = 0;for (IntWritable value : values) {total += value.get();}context.write(key,new IntWritable(total));}
}
3、Driver
package com.csust.homework2;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TaskDriver2 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//创建一个job任务对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//如果打包运行出错,则需要加该配置job.setJarByClass(TaskDriver2.class);// 关联Mapper和Reducer的jarjob.setMapperClass(TaskMapper2.class);job.setReducerClass(TaskReducer2.class);//设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);//设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework2_out"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
4、运行程序
5、运行结果
3.3、上午7-9点之间,搜索过“赶集网”的用户UID
1、Mapper
package com.csust.homework3;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;public class TaskMapper3 extends Mapper<LongWritable, Text, Text, Text> {Text outputK = new Text();Text outputV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split("\t");SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");try {Date time = format.parse(words[0]);Calendar calendar = Calendar.getInstance();calendar.setTime(time);int hour = calendar.get(Calendar.HOUR_OF_DAY);if (hour >= 7 && hour < 9) {if (words[2].contains("赶集网")) {//UIDoutputK.set(words[1]);outputV.set(words[2]);context.write(outputK, outputV);}}} catch (ParseException e) {e.printStackTrace();}}
}
2、Reduce
package com.csust.homework3;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TaskReducer3 extends Reducer<Text,Text,Text,Text> {Text outputV = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {StringBuilder total= new StringBuilder();for (Text value : values) {total.append(value.toString());total.append("\t");}outputV.set(total.toString());context.write(key,outputV);}
}
3、Driver
package com.csust.homework3;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TaskDriver3 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//创建一个job任务对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//如果打包运行出错,则需要加该配置job.setJarByClass(TaskDriver3.class);// 关联Mapper和Reducer的jarjob.setMapperClass(TaskMapper3.class);job.setReducerClass(TaskReducer3.class);//设置Mapper输出的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework3_out"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
4、 运行程序
5、运行结果
3.4、通过Rank:点击排名 对数据进行排序
1、Mapper
package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class TaskMapper4 extends Mapper<LongWritable, Text, IntWritable, Text> {Text outputV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {String line = value.toString();String[] words = line.split("\t");IntWritable outputK = new IntWritable(Integer.parseInt(words[3])); List<String> list = new ArrayList<String>(Arrays.asList(words));list.remove(3);String data = String.join("\t", list); outputV.set(data);context.write(outputK, outputV); }
}
2、Reduce
package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TaskReducer4 extends Reducer<IntWritable, Text, Text, IntWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<Text> values, Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value, key); }}
}
3、Driver
package com.csust.sort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
public class TaskDriver4 {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//创建一个job任务对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//如果打包运行出错,则需要加该配置job.setJarByClass(TaskDriver4.class);// 关联Mapper和Reducer的jarjob.setMapperClass(TaskMapper4.class);job.setReducerClass(TaskReducer4.class);//设置Mapper输出的kv类型job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);//设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置排序方式job.setSortComparatorClass(MyComparator.class);//设置输入和输出路径FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000/homework"));FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000/homework4_out"));//提交任务boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}
4、Commparator
package com.csust.sort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
public class MyComparator extends IntWritable.Comparator{public int compare(WritableComparable a, WritableComparable b) {return -super.compare(a, b);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return -super.compare(b1, s1, l1, b2, s2, l2);}
}
5、运行程序
6、 运行结果
四、参考
集群搭建
MapReduce实现单词统计
MapReduce统计手机号