读取任务一中序列文件,统计每个用户每天的访问次数,最终将2021/1和2021/2的数据分别输出在两个文件中。
一、创建项目步骤:
1.创建项目
2.修改pom.xml文件
<packaging>jar</packaging>
<dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency> </dependencies>
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase></execution></executions></plugin></plugins> </build>
3.在resources目录下创建日志文件log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=D:\\countLog.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
二、编写程序:
1.自定义键的类型 MemberLogTime 包含两个属性(memberId,memberLogTime) 实现WritableComparable接口
2.编写Mapper模块:(在Mapper中计数器,使用枚举)
package com.maidu.countlog; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /**@author:yt* @since:2024-05-15* 定义Mapper模块*/ public class LogCountMapper extends Mapper<Text,Text,MemberLogTime, IntWritable> {private MemberLogTime mt =new MemberLogTime();private IntWritable one =new IntWritable(1);//使用枚举enum LogCounter{January,February}@Overrideprotected void map(Text key, Text value, Mapper<Text, Text, MemberLogTime, IntWritable>.Context context)throws IOException, InterruptedException {String memberId =key.toString();String logTime =value.toString();//计数器if(logTime.contains("2021/1")){//一月计数器值+1context.getCounter(LogCounter.January).increment(1);}else{context.getCounter(LogCounter.February).increment(1);}//将用户ID和访问时间存到MemberLogTime对象中mt.setMemberId(memberId);mt.setLogTime(logTime);context.write(mt,one);} }
3.编写Combiner:
package com.maidu.countlog;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author:yt* @since:2024-05-15* 合并的类*/ public class LogCoutCombiner extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> {@Overrideprotected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)throws IOException, InterruptedException {int sum =0;for(IntWritable val:values){sum+=val.get();}context.write(key,new IntWritable(sum));} }
4.编写Patitioner:
package com.maidu.countlog;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner;/*** @author:yt* @since:2024-05-15*/ public class LogCountPartitioner extends Partitioner<MemberLogTime, IntWritable> {@Overridepublic int getPartition(MemberLogTime memberLogTime, IntWritable intWritable, int numPartitions) {String date = memberLogTime.getLogTime();if( date.contains( "2021/1") ){return 0%numPartitions;}else{return 1%numPartitions;}} }
5.编写Reduce模块:
package com.maidu.countlog;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author:yt* @since:2024-05-15*/ public class LogCountReducer extends Reducer<MemberLogTime, IntWritable,MemberLogTime,IntWritable> {@Overrideprotected void reduce(MemberLogTime key, Iterable<IntWritable> values, Reducer<MemberLogTime, IntWritable, MemberLogTime, IntWritable>.Context context)throws IOException, InterruptedException {//计数器(动态计数器)if(key.getLogTime().contains("2021/1")){context.getCounter("OuputCounter","JanuaryResult").increment(1);} else{context.getCounter("OuputCounter","FabruaryResult").increment(1);}int sum =0;for(IntWritable val:values){sum+=val.get();}context.write(key,new IntWritable(sum));} }
6.编写Driver模块:
package com.maidu.countlog;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** @author:yt* @since:2024-05-15*/ public class LogCount {public static void main(String[] args) throws Exception {Configuration conf =new Configuration();Job job =Job.getInstance(conf,"Log count");job.setJarByClass(LogCount.class);job.setMapperClass(LogCountMapper.class);job.setReducerClass(LogCountReducer.class);job.setCombinerClass(LogCoutCombiner.class);job.setPartitionerClass(LogCountPartitioner.class);//设置reduce任务数2job.setNumReduceTasks(2);job.setOutputKeyClass(MemberLogTime.class);job.setOutputValueClass(IntWritable.class);job.setInputFormatClass(SequenceFileAsTextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);//添加输入路径FileInputFormat.addInputPath(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));System.exit(job.waitForCompletion(true)?0:1);} }
7.使用Maven打包为Jar文件,上传到master节点上执行
[yt@master ~]$ hadoop jar countLog-1.0-SNAPSHOT.jar com.maidu.countlog.LogCount /ouput-2301-select/part-m-00000 /logcount/2301/
8.查看运行结果
(1)计数器
(2)结果
(3)查看其中一个文件内容
[yt@master ~]$ hdfs dfs -cat /logcount/2301/part-r-00001