电商推荐系统

在这里插入图片描述

此篇博客主要记录一下商品推荐系统的主要实现过程。

一、获取用户对商品的偏好值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep1 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep1(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS1Mapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");// 将行为转化为偏好值double like = 0.0;if (split.length >= 3) {String str = split[2].toLowerCase();if (str.equals("paysuccess")) { // 支付成功like = 0.3;} else if (str.equals("addreview")) { //评论like = 0.3;} else if (str.equals("createorder")) { // 创建订单like = 0.2;} else if (str.equals("addcar")){ // 加入购物车like = 0.15;} else { // 浏览like = 0.05;}}// key=用户:商品 value=[偏好,偏好]Text outkey = new Text(split[0] + ":" + split[1]);DoubleWritable outvalue = new DoubleWritable(like);context.write(outkey, outvalue);}}public static class GS1Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {// 避免精度丢失选用bigDecimalBigDecimal sum = new BigDecimal(0.0);for (DoubleWritable value : values) {BigDecimal v = new BigDecimal(value.get());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);//  默认reducejob.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step1"));// 6、运行job.waitForCompletion(true);return 0;}
}

二、将偏好数据整理成偏好矩阵

在这里插入图片描述
在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep2 extends Configured implements Tool {public static class GS2Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");// 将商品id作为输出keyText outkey = new Text(split[1]);// 将用户id与偏好值组合形成valueText outvalue = new Text(split[0] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS2Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep2(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep2.GS2Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep2.GS2Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step2"));// 6、运行job.waitForCompletion(true);return 0;}
}

三、统计商品共现次数

在这里插入图片描述

代码实现

笛卡尔积
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.ArrayList;public class GoodsStep3 extends Configured implements Tool {public static class GS3Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String uid_gid = key.toString();String[] split = uid_gid.split(":");Text outkey = new Text(split[0]);Text outvalue = new Text(split[1]);context.write(outkey, outvalue);}}public static class GS3Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> list = new ArrayList<>();for (Text value:values) {list.add(value.toString());}for (String g1 : list) {for (String g2:list) {if (!g1.equals(g2)) {Text outkey = new Text(g1);Text outvalue = new Text(g2);context.write(outkey, outvalue);}}}}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep3(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator","\t");Job job = Job.getInstance(conf, "step3");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS3Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS3Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step1/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step3"));// 6、运行job.waitForCompletion(true);return 0;}
}
共现次数
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep4 extends Configured implements Tool {public static class GS4Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split("\t");String outkey = split[0] + ":" + split[1];context.write(new Text(outkey), new IntWritable(1));}}public static class GS4Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable i : values) {sum += 1;}context.write(key, new IntWritable(sum));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep4(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step4");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS4Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GS4Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("src/main/resources/step3/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step4"));// 6、运行job.waitForCompletion(true);return 0;}
}

四、获取商品共现矩阵

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep5 extends Configured implements Tool {public static class GS5Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String goods = key.toString();String[] split = goods.split(":");// key为第一列商品,value为第二列商品:次数Text outkey = new Text(split[0]);Text outvalue = new Text(split[1] + ":" + value.toString());context.write(outkey, outvalue);}}public static class GS5Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {StringBuffer buffer = new StringBuffer();for (Text value : values) {buffer.append(value.toString()).append(",");}buffer.setLength(buffer.length() - 1);context.write(key, new Text(buffer.toString()));}}public static void main(String[] args) {try {ToolRunner.run(new GoodsStep5(), args);} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step5");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GS5Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GS5Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step4/part-r-00000"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step5"));// 6、运行job.waitForCompletion(true);return 0;}
}

五、获取推荐值

在这里插入图片描述

代码实现

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;public class GoodsStep6 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep6(), args);} catch (Exception e) {e.printStackTrace();}}/*** 第二步* 375	11:0.25,5:0.25,4:0.55* 商品 用户:偏好值* 第五步* 375	203:1,961:1,91:1,90:2,89:1* 商品 商品:共现次数* 输出数据:* 用户:商品 推荐值*/public static class GS6Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");for (String str : split) {// key=商品 value={用户:偏好值,商品:共现次数}context.write(key, new Text(str));}}}public static class GS6Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {// 偏好集合[用户:偏好]HashMap<String, String> like = new HashMap<>();// 共现集合[商品:共现次数]HashMap<String, String> same = new HashMap<>();for (Text value : values) {String data = value.toString();String[] split = data.split(":");if (split[1].contains(".")) {like.put(split[0], split[1]);} else {same.put(split[0], split[1]);}}for (Map.Entry<String, String> l : like.entrySet()) {for (Map.Entry<String, String> s : same.entrySet()) {//用户偏好值BigDecimal lvalue = new BigDecimal(l.getValue());//商品共现BigDecimal svalue = new BigDecimal(s.getValue());//用户:共现商品Text outkey = new Text(l.getKey() + ":" + s.getKey());double outvalue = lvalue.multiply(svalue).doubleValue();context.write(outkey, new DoubleWritable(outvalue));}}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step6");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep6.GS6Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep6.GS6Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step2"),new Path("src/main/resources/step5"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step6"));// 6、运行job.waitForCompletion(true);return 0;}
}

六、推荐值累加及数据清洗

在这里插入图片描述

代码实现

推荐值累加
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.math.BigDecimal;public class GoodsStep7 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep7(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS7Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS7Reducer extends Reducer<Text, Text, Text, DoubleWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {BigDecimal sum = new BigDecimal(0.0);for (Text value : values) {BigDecimal v = new BigDecimal(value.toString());sum = sum.add(v);}DoubleWritable outvalue = new DoubleWritable(sum.doubleValue());context.write(key, outvalue);}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step7");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep7.GS7Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep7.GS7Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job, new Path("src/main/resources/step6"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step7"));// 6、运行job.waitForCompletion(true);return 0;}
}
数据清洗
统计已经支付成功一次的数据
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;public class GoodsStep8 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8Mapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] split = value.toString().split(",");boolean paySuccess = split[2].toLowerCase().equals("paysuccess");if (paySuccess) {context.write(new Text(split[0] + ":" + split[1]), new IntWritable(1));}}}public static class GS8Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int num = 0;for (IntWritable i : values) {num ++;}if (num == 1) context.write(key, new IntWritable(num));}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.1");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8.GS8Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);//  默认reducejob.setReducerClass(GoodsStep8.GS8Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 输入分片类型job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("data/userLog.log"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8"));// 6、运行job.waitForCompletion(true);return 0;}
}
在整理出来的数据中去除统计出来支付成功的
package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;
import java.util.Iterator;public class GoodsStep8_2 extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsStep8_2(), args);} catch (Exception e) {e.printStackTrace();}}public static class GS8_1Mapper extends Mapper<Text, Text, Text, Text> {@Overrideprotected void map(Text key, Text value, Context context) throws IOException, InterruptedException {context.write(key, value);}}public static class GS8_1Reducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//            int num = 0;
//            String outvalue = "";
//            for (Text value : values) {
//                outvalue = value.toString();
//                num ++;
//            }
//            if (num == 1) context.write(key, new Text(outvalue));Iterator<Text> iter = values.iterator();Text outvalue = iter.next();if (iter.hasNext()) {}else {context.write(key, outvalue);}}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();Job job = Job.getInstance(conf, "step8.2");job.setJarByClass(this.getClass());// 2、装配map,指定map对象,map对象的输出key和valuejob.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//  默认reducejob.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 输入分片类型job.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job, new Path("src/main/resources/step7"),new Path("src/main/resources/step8"));// 5、指定输出文件,文件类型, 文件路径job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("src/main/resources/step8_2"));// 6、运行job.waitForCompletion(true);return 0;}
}

七、写入数据库

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;public class GoodsStep9 {public static void main(String[] args) {try {toDB();} catch (Exception e) {e.printStackTrace();}}public static void toDB() throws Exception {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(conf);Class.forName("com.mysql.cj.jdbc.Driver");Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/tmall?serverTimezone=Asia/Shanghai", "briup", "briup");Statement statement = null;FSDataInputStream open = fs.open(new Path("src/main/resources/step8_2/part-r-00000"));BufferedReader br = new BufferedReader(new InputStreamReader(open));String line = "";while ((line = br.readLine()) != null) {// 11:512	0.25// 用户:商品 推荐值String[] str = line.split("\t");String[] uid_gid = str[0].split(":");statement = conn.createStatement();String sql = "delete from recommend where customerId = '" + uid_gid[0] + "' and bookId = '" + uid_gid[1] + "'";String sql2 = "insert into recommend(customerId, bookId, recommendNum) values ('"+ uid_gid[0] + "','" + uid_gid[1] + "'," + str[1] + ")";statement.addBatch(sql);statement.addBatch(sql2);statement.executeBatch();}}
}

八、工作流

package zb.grms;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class GoodsMain extends Configured implements Tool {public static void main(String[] args) {try {ToolRunner.run(new GoodsMain(), args);GoodsStep9.toDB();} catch (Exception e) {e.printStackTrace();}}@Overridepublic int run(String[] strings) throws Exception {Configuration conf = getConf();
//        String inpath = new String("inpath");
//        Path path = new Path(conf.get(inpath));Path path = new Path("data/userLog.log");Path outpath =  new Path("src/main/resources/step1");Path outpath2 = new Path("src/main/resources/step2");Path outpath3 = new Path("src/main/resources/step3");Path outpath4 = new Path("src/main/resources/step4");Path outpath5 = new Path("src/main/resources/step5");Path outpath6 = new Path("src/main/resources/step6");Path outpath7 = new Path("src/main/resources/step7");Path outpath8 = new Path("src/main/resources/step8");Path outpath9 = new Path("src/main/resources/step8_2");//获取所有mr步骤job配置//step1Job job = Job.getInstance(conf);job.setJarByClass(this.getClass());job.setMapperClass(GoodsStep1.GS1Mapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DoubleWritable.class);job.setReducerClass(GoodsStep1.GS1Reducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DoubleWritable.class);job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,path);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,outpath);//step8Job job8 = Job.getInstance(conf);job8.setJarByClass(this.getClass());job8.setMapperClass(GoodsStep8.GS8Mapper.class);job8.setMapOutputKeyClass(Text.class);job8.setMapOutputValueClass(IntWritable.class);job8.setReducerClass(GoodsStep8.GS8Reducer.class);job8.setOutputKeyClass(Text.class);job8.setOutputValueClass(IntWritable.class);job8.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job8,path);job8.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job8,outpath8);//step2Job job2 = Job.getInstance(conf);job2.setJarByClass(this.getClass());job2.setMapperClass(GoodsStep2.GS2Mapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(Text.class);job2.setReducerClass(GoodsStep2.GS2Reducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(Text.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job2,outpath);job2.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job2,outpath2);//step3bookIdJob job3 = Job.getInstance(conf);job3.setJarByClass(this.getClass());job3.setMapperClass(GoodsStep3.GS3Mapper.class);job3.setMapOutputKeyClass(Text.class);job3.setMapOutputValueClass(Text.class);job3.setReducerClass(GoodsStep3.GS3Reducer.class);job3.setOutputKeyClass(Text.class);job3.setOutputValueClass(Text.class);job3.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job3,outpath);job3.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job3,outpath3);//step4Job job4 = Job.getInstance(conf);job4.setJarByClass(this.getClass());job4.setMapperClass(GoodsStep4.GS4Mapper.class);job4.setMapOutputKeyClass(Text.class);job4.setMapOutputValueClass(IntWritable.class);job4.setReducerClass(GoodsStep4.GS4Reducer.class);job4.setOutputKeyClass(Text.class);job4.setOutputValueClass(IntWritable.class);job4.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job4,outpath3);job4.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job4,outpath4);//step5Job job5 = Job.getInstance(conf);job5.setJarByClass(this.getClass());job5.setMapperClass(GoodsStep5.GS5Mapper.class);job5.setMapOutputKeyClass(Text.class);job5.setMapOutputValueClass(Text.class);job5.setReducerClass(GoodsStep5.GS5Reducer.class);job5.setOutputKeyClass(Text.class);job5.setOutputValueClass(Text.class);job5.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.addInputPath(job5,outpath4);job5.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job5,outpath5);//step6Job job6 = Job.getInstance(conf);job6.setJarByClass(this.getClass());job6.setMapperClass(GoodsStep6.GS6Mapper.class);job6.setMapOutputKeyClass(Text.class);job6.setMapOutputValueClass(Text.class);job6.setReducerClass(GoodsStep6.GS6Reducer.class);job6.setOutputKeyClass(Text.class);job6.setOutputValueClass(DoubleWritable.class);job6.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job6,outpath2,outpath5);job6.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job6,outpath6);//step7Job job7 = Job.getInstance(conf);job7.setJarByClass(this.getClass());job7.setMapperClass(GoodsStep7.GS7Mapper.class);job7.setMapOutputKeyClass(Text.class);job7.setMapOutputValueClass(Text.class);job7.setReducerClass(GoodsStep7.GS7Reducer.class);job7.setOutputKeyClass(Text.class);job7.setOutputValueClass(DoubleWritable.class);job7.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job7,outpath6);job7.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job7,outpath7);//step9Job job9 = Job.getInstance(conf);job9.setJarByClass(this.getClass());job9.setMapperClass(GoodsStep8_2.GS8_1Mapper.class);job9.setMapOutputKeyClass(Text.class);job9.setMapOutputValueClass(Text.class);job9.setReducerClass(GoodsStep8_2.GS8_1Reducer.class);job9.setOutputKeyClass(Text.class);job9.setOutputValueClass(Text.class);job9.setInputFormatClass(KeyValueTextInputFormat.class);KeyValueTextInputFormat.setInputPaths(job9,outpath7,outpath8);job9.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job9,outpath9);//创建可控作业ControlledJob cj = new ControlledJob(conf);cj.setJob(job);ControlledJob cj2 = new ControlledJob(conf);cj2.setJob(job2);ControlledJob cj3 = new ControlledJob(conf);cj3.setJob(job3);ControlledJob cj4 = new ControlledJob(conf);cj4.setJob(job4);ControlledJob cj5 = new ControlledJob(conf);cj5.setJob(job5);ControlledJob cj6 = new ControlledJob(conf);cj6.setJob(job6);ControlledJob cj7 = new ControlledJob(conf);cj7.setJob(job7);ControlledJob cj8 = new ControlledJob(conf);cj8.setJob(job8);ControlledJob cj9 = new ControlledJob(conf);cj9.setJob(job9);//添加作业间的依赖关系cj2.addDependingJob(cj);cj3.addDependingJob(cj);cj4.addDependingJob(cj3);cj5.addDependingJob(cj4);cj6.addDependingJob(cj2);cj6.addDependingJob(cj5);cj7.addDependingJob(cj6);cj9.addDependingJob(cj7);cj9.addDependingJob(cj8);//创建工作流,创建控制器JobControl jobs = new JobControl("work_flow");jobs.addJob(cj);jobs.addJob(cj2);jobs.addJob(cj3);jobs.addJob(cj4);jobs.addJob(cj5);jobs.addJob(cj6);jobs.addJob(cj7);jobs.addJob(cj8);jobs.addJob(cj9);//启动控制器-》一键完成所有mr计算任务Thread t=new Thread(jobs);t.start();while(true){if(jobs.allFinished()){System.out.println("作业全部完成");System.out.println(jobs.getSuccessfulJobList());jobs.stop();return 0;}else if(jobs.getFailedJobList().size()>0) {System.out.println("任务失败");System.out.println(jobs.getFailedJobList());jobs.stop();return -1;}}}
}

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/454648.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

开发大佬为什么都不喜欢关电脑?

引言 在平时工作中&#xff0c;咱们程序员这一群体往往展现出一些特有的行为习惯&#xff0c;其中之一便是不喜欢频繁地关闭电脑、拒绝关机、长久待机、特别是苹果的机器。 下面从技术分析与用户行为研究的角度出发&#xff0c;将深入探讨程序员倾向于保持电脑开机状态的原因…

数字孪生网络攻防模拟与城市安全演练

在数字化浪潮的推动下&#xff0c;网络攻防模拟和城市安全演练成为维护社会稳定的不可或缺的环节。基于数字孪生技术我们能够在虚拟环境中进行高度真实的网络攻防模拟&#xff0c;为安全专业人员提供实战经验&#xff0c;从而提升应对网络威胁的能力。同时&#xff0c;在城市安…

P60_ib公式推导2

永磁无刷直流电机B相电流值推导过程&#xff08;t2过程&#xff09;

RabbitMQ-1.介绍与安装

介绍与安装 1.RabbitMQ1.0.技术选型1.1.安装1.2.收发消息1.2.1.交换机1.2.2.队列1.2.3.绑定关系1.2.4.发送消息 1.2.数据隔离1.2.1.用户管理1.2.3.virtual host 1.RabbitMQ 1.0.技术选型 消息Broker&#xff0c;目前常见的实现方案就是消息队列&#xff08;MessageQueue&…

springboot155基于JAVA语言的在线考试与学习交流网页平台

简介 【毕设源码推荐 javaweb 项目】基于springbootvue 的 适用于计算机类毕业设计&#xff0c;课程设计参考与学习用途。仅供学习参考&#xff0c; 不得用于商业或者非法用途&#xff0c;否则&#xff0c;一切后果请用户自负。 看运行截图看 第五章 第四章 获取资料方式 **项…

智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码)

文章目录 效果一览文章概述源码设计参考资料效果一览 文章概述 智能优化算法 | Matlab实现飞蛾扑火(MFO)(内含完整源码) 源码设计 %%%% clear all clc SearchAgents_no=100; % Number of search ag

Photoshop 2023下载安装教程,免费直装版,2步搞定安装,附安装包

准备工作&#xff1a; 1、提前准备好photoshop 2023安装包 没有的可以参考下面方式获取 2、系统要求Windows 10 及以上 安装步骤 1.找到下载好的安装包&#xff0c;直接双击解压 2.双击运行【Set-up.exe】文件 3.点击文件夹图标&#xff0c;更改安装位置 4.点击【继续】&a…

服务攻防-端口协议桌面应用QQWPS等RCEhydra口令猜解未授权检测

知识点&#xff1a; 1、端口协议-弱口令&未授权&攻击方式等 2、桌面应用-社交类&文档类&工具类等 章节点&#xff1a; 1、目标判断-端口扫描&组合判断&信息来源 2、安全问题-配置不当&CVE漏洞&弱口令爆破 3、复现对象-数据库&中间件&…

【已解决】pt文件转onnx后再转rknn时得到推理图片出现大量锚框变花屏

前言 环境介绍&#xff1a; 1.编译环境 Ubuntu 18.04.5 LTS 2.RKNN版本 py3.8-rknn2-1.4.0 3.单板 迅为itop-3568开发板 一、现象 采用yolov5训练并将pt转换为onnx&#xff0c;再将onnx采用py3.8-rknn2-1.4.0推理转换为rknn&#xff0c;rknn模型能正常转换&#xff0c;…

[LitCTF 2023]Where is P?

题目&#xff1a; from Crypto.Util.number import * mbytes_to_long(bXXXX) e65537 pgetPrime(1024) qgetPrime(1024) np*q print(p) cpow(m,e,n) Pp>>340 print(P) apow(P,3,n) print("n",n) print("c",c) print("a",a) #n 24479907029…

Vue中keep-alive的作用、原理及应用场景

在进行Vue开发的过程中&#xff0c;我们经常会遇到需要进行组件缓存的场景&#xff0c;这时候Vue提供的keep-alive组件就派上了用场。keep-alive组件是Vue内置的一个抽象组件&#xff0c;它可以将其包裹的组件进行缓存&#xff0c;提高组件的性能&#xff0c;同时也可以节省服务…

Sklearn、TensorFlow 与 Keras 机器学习实用指南第三版(四)

原文&#xff1a;Hands-On Machine Learning with Scikit-Learn, Keras, and TensorFlow 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 第二部分&#xff1a;神经网络和深度学习 第十章&#xff1a;使用 Keras 入门人工神经网络 鸟类启发我们飞行&#xff0c;牛蒡植…