文章目录
- 第1章 项目体系框架设计(说明书)
- 第2章 工具环境搭建(说明书)
- 第3章 项目创建并初始化业务数据
- 3.1 IDEA创建Maven项目(略)
- 3.2 数据加载准备(说明书)
- 3.3 数据初始化到MongoDB 【DataLoader 数据加载模块】
- 数据加载程序主体实现 + 数据写入MongoDB
- StatisticsRecommender 统计推荐模块
- 第4章 离线推荐服务建设
- 4.1 离线推荐服务
- 4.2 离线统计服务 【统计推荐模块】
- 4.3 基于隐语义模型的协同过滤推荐 【LFM的离线推荐模块】
- 4.3.1 用户商品推荐列表
- 4.3.2 商品相似度矩阵
- 4.3.3 模型评估和参数选取
- 第5章 实时推荐服务建设【实时推荐模块】
- 5.2 实时推荐模型和代码框架
- 5.2.1 实时推荐模型算法设计
- 5.2.2 实时推荐模块框架
- 5.3 实时推荐算法的实现
- 5.3.1 获取用户的K次最近评分
- 5.3.2 获取当前商品最相似的K个商品
- 5.3.3 商品推荐优先级计算
- 5.3.4 将结果保存到mongoDB
- 5.3.5 更新实时推荐结果
- 5.4 实时系统联调
- 5.4.1 启动实时系统的基本组件
- 5.4.2 启动zookeeper
- 5.4.3 启动kafka
- 5.4.4 构建Kafka Streaming程序
- 5.4.5 配置并启动flume
- 5.4.6 启动业务系统后台
- 第6章 冷启动问题处理
- 第7章 其它形式的离线相似推荐服务
- 7.1 基于内容的相似推荐
- 7.2 基于物品的协同过滤相似推荐
- 第8章 程序部署与运行
- 8.1 发布项目
- 8.2 安装前端项目
- 8.3 安装业务服务器
- 8.4 Kafka配置与启动
- 8.5 Flume配置与启动
- 8.6 部署流式计算服务
- 8.7 Azkaban调度离线算法
第1章 项目体系框架设计(说明书)
第2章 工具环境搭建(说明书)
- MongoDB安装最新版本 => 解决Ubuntu安装mongodb缺少依赖问题
- 使用CentOS7系统,按照工具环境搭建流程安装MongoDB、Redis、Spark、Zookeeper、Flume-ng、Kafka
第3章 项目创建并初始化业务数据
3.1 IDEA创建Maven项目(略)
3.2 数据加载准备(说明书)
3.3 数据初始化到MongoDB 【DataLoader 数据加载模块】
数据加载程序主体实现 + 数据写入MongoDB
- 为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入。
- 在DataLoader/src/main/scala下新建package,命名为com.atguigu.recommender,新建名为DataLoader的scala class文件。
- 防火墙问题:连接mongodb需要关闭防火墙
StatisticsRecommender 统计推荐模块
代码解析:
- 临时表 -> 结果表
- 注册UDF,将timestamp转化为年月格式yyyyMM
spark.udf.register("changeDate", (x: Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt)
第4章 离线推荐服务建设
4.1 离线推荐服务
- 离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。
- 离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
- 离线推荐服务主要分为统计推荐、基于隐语义模型的协同过滤推荐以及基于内容和基于Item-CF的相似推荐。
- 这一章主要介绍前两部分,基于内容和Item-CF的推荐在整体结构和实现上是类似的,我们将在第7章详细介绍。
4.2 离线统计服务 【统计推荐模块】
-
在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入spark、scala和mongodb的相关依赖:
-
在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala 单例对象com.atguigu.statistics.StatisticsRecommender。
-
同样,我们应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。
-
历史热门商品统计:根据所有历史评分数据,计算历史评分次数最多的商品
- 通过Spark SQL读取评分数据集,统计所有评分中评分数最多的商品
- 然后按照从大到小排序,将最终结果写入MongoDB的RateMoreProducts数据集中
-
最近热门商品统计:根据评分,按月为单位计算最近时间的月份里面评分数最多的商品集合
- 通过Spark SQL读取评分数据集,通过UDF函数将评分的数据时间修改为月,然后统计每月商品的评分数
- 统计完成之后将数据写入到MongoDB的RateMoreRecentlyProducts数据集中
-
商品平均得分统计:根据历史数据中所有用户对商品的评分,周期性的计算每个商品的平均得分
- 通过Spark SQL读取保存在MongDB中的Rating数据集,通过执行以下SQL语句实现对于商品的平均分统计
- 统计完成之后将生成的新的DataFrame写出到MongoDB的AverageProducts集合中
主体代码(src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala):
package com.atguigu.statisticsimport java.text.SimpleDateFormat
import java.util.Dateimport org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}case class Rating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )object StatisticsRecommender {// 定义mongodb中存储的表名val MONGODB_RATING_COLLECTION = "Rating"val RATE_MORE_PRODUCTS = "RateMoreProducts"val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"val AVERAGE_PRODUCTS = "AverageProducts"def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[1]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender")// 创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StatisticsRecommender")// 创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )// 加载数据val ratingDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[Rating].toDF()// 创建一张叫ratings的临时表ratingDF.createOrReplaceTempView("ratings")// TODO: 【 用spark sql去做不同的统计推荐 】// todo: (1)历史热门商品,按照评分个数统计,productId,countval rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId order by count desc")storeDFInMongoDB( rateMoreProductsDF, RATE_MORE_PRODUCTS )// todo: (2)近期热门商品,把时间戳转换成yyyyMM格式进行评分个数统计,最终得到productId, count, yearmonth// 创建一个日期格式化工具val simpleDateFormat = new SimpleDateFormat("yyyyMM")// 注册UDF,将timestamp转化为年月格式yyyyMMspark.udf.register("changeDate", (x: Int)=>simpleDateFormat.format(new Date(x * 1000L)).toInt)// 把原始rating数据转换成想要的结构productId, score, yearmonthval ratingOfYearMonthDF = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")ratingOfYearMonthDF.createOrReplaceTempView("ratingOfMonth")val rateMoreRecentlyProductsDF = spark.sql("select productId, count(productId) as count, yearmonth from ratingOfMonth group by yearmonth, productId order by yearmonth desc, count desc")// 把df保存到mongodbstoreDFInMongoDB( rateMoreRecentlyProductsDF, RATE_MORE_RECENTLY_PRODUCTS )// todo: (3)优质商品统计,商品的平均评分,productId,avgval averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId order by avg desc")storeDFInMongoDB( averageProductsDF, AVERAGE_PRODUCTS )spark.stop()}// TODO: 【 保存到MongoDB数据库 】def storeDFInMongoDB(df: DataFrame, collection_name: String)(implicit mongoConfig: MongoConfig): Unit ={df.write.option("uri", mongoConfig.uri).option("collection", collection_name).mode("overwrite").format("com.mongodb.spark.sql").save()}
}
4.3 基于隐语义模型的协同过滤推荐 【LFM的离线推荐模块】
- 项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。
4.3.1 用户商品推荐列表
-
通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:
- userId和productId做笛卡尔积,产生(userId,productId)的元组
- 通过模型预测(userId,productId)对应的评分。
- 将预测结果通过预测分值进行排序。
- 返回分值最大的K个商品,作为当前用户的推荐列表。
-
最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中
-
新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:
-
同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。
4.3.2 商品相似度矩阵
-
通过ALS计算商品相似度矩阵,该矩阵用于查询当前商品的相似商品并为实时推荐系统服务。
-
离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(m x k)矩阵,每个用户由 k个特征描述;表示物品特征矩阵的V(n x k)矩阵,每个物品也由 k 个特征描述。
-
V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是k 个维度的数学向量表示了该行对应商品的特征。
-
所以,每个商品用V(n x k)每一行的<t1,t2,t3,…>向量表示其特征,于是任意两个商品 p:特征向量为Vp=< tp1,tp2,tp3,…,tpk >,商品q:特征向量为Vq=< tq1,tq2,tq3,…,tqk >之间的相似度sim(p,q)可以使用和的余弦值来表示:
-
数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。
4.3.3 模型评估和参数选取
-
在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda三个参数。
-
对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。
-
通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。
-
有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。
-
其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。
-
代码实现如下:
-
计算RMSE的函数getRMSE代码实现如下:
-
运行代码得到目前数据的最优模型参数
代码主体:
package com.atguigu.offlineimport org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrixcase class ProductRating( userId: Int, productId: Int, score: Double, timestamp: Int )
case class MongoConfig( uri: String, db: String )// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义用户的推荐列表
case class UserRecs( userId: Int, recs: Seq[Recommendation] )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )object OfflineRecommender {// 定义mongodb中存储的表名val MONGODB_RATING_COLLECTION = "Rating"val USER_RECS = "UserRecs"val PRODUCT_RECS = "ProductRecs"val USER_MAX_RECOMMENDATION = 20def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender")// 创建一个spark configval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")// 创建spark sessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )// 加载数据val ratingRDD = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRating].rdd.map(rating => (rating.userId, rating.productId, rating.score)).cache()// 提取出所有用户和商品的数据集val userRDD = ratingRDD.map(_._1).distinct()val productRDD = ratingRDD.map(_._2).distinct()// 核心计算过程// 1. 训练隐语义模型val trainData = ratingRDD.map(x=>Rating(x._1,x._2,x._3))// 定义模型训练的参数,rank隐特征个数,iterations迭代词数,lambda正则化系数val ( rank, iterations, lambda ) = ( 5, 10, 0.01 )val model = ALS.train( trainData, rank, iterations, lambda )// 2. 获得预测评分矩阵,得到用户的推荐列表// 用userRDD和productRDD做一个笛卡尔积,得到空的userProductsRDD表示的评分矩阵val userProducts = userRDD.cartesian(productRDD)val preRating = model.predict(userProducts)// 从预测评分矩阵中提取得到用户推荐列表val userRecs = preRating.filter(_.rating>0).map(rating => ( rating.user, ( rating.product, rating.rating ) )).groupByKey().map{case (userId, recs) =>UserRecs( userId, recs.toList.sortWith(_._2>_._2).take(USER_MAX_RECOMMENDATION).map(x=>Recommendation(x._1,x._2)) )}.toDF()userRecs.write.option("uri", mongoConfig.uri).option("collection", USER_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()// 3. 利用商品的特征向量,计算商品的相似度列表val productFeatures = model.productFeatures.map{case (productId, features) => ( productId, new DoubleMatrix(features) )}// 两两配对商品,计算余弦相似度val productRecs = productFeatures.cartesian(productFeatures).filter{case (a, b) => a._1 != b._1}// 计算余弦相似度.map{case (a, b) =>val simScore = consinSim( a._2, b._2 )( a._1, ( b._1, simScore ) )}.filter(_._2._2 > 0.4).groupByKey().map{case (productId, recs) =>ProductRecs( productId, recs.toList.sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)) )}.toDF()productRecs.write.option("uri", mongoConfig.uri).option("collection", PRODUCT_RECS).mode("overwrite").format("com.mongodb.spark.sql").save()spark.stop()}def consinSim(product1: DoubleMatrix, product2: DoubleMatrix): Double ={product1.dot(product2)/ ( product1.norm2() * product2.norm2() )}
}
第5章 实时推荐服务建设【实时推荐模块】
- 实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该反映最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好。
- 用户对物品的偏好随着时间的推移总是会改变的。
- 比如一个用户u 在某时刻对商品p 给予了极高的评分,那么在近期一段时候,u 极有可能很喜欢与商品p 类似的其他商品;
- 而如果用户u 在某时刻对商品q 给予了极低的评分,那么在近期一段时候,u 极有可能不喜欢与商品q 类似的其他商品。
- 所以对于实时推荐,当用户对一个商品进行了评价后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味。
- 如果实时推荐继续采用离线推荐中的ALS 算法,由于算法运行时间巨大,不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少差别,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验。
- 另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐的精度要求则可以适当放宽。
- 所以对于实时推荐算法,主要有两点需求:
- 用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果;
- 计算量不大,满足响应时间上的实时或者准实时要求;
5.2 实时推荐模型和代码框架
5.2.1 实时推荐模型算法设计
5.2.2 实时推荐模块框架
-
在recommender下新建子项目StreamingRecommender,引入spark、scala、mongo、redis和kafka的依赖:
-
代码中首先定义样例类和一个连接助手对象(用于建立redis和mongo连接),并在StreamingRecommender中定义一些常量
-
实时推荐主体代码如下:
def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender","kafka.topic" -> "recommender")//创建一个SparkConf配置val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.cores"))val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sc = spark.sparkContextval ssc = new StreamingContext(sc,Seconds(2))implicit val mongConfig = MongConfig(config("mongo.uri"),config("mongo.db"))import spark.implicits._// 广播商品相似度矩阵//装换成为 Map[Int, Map[Int,Double]]val simProductsMatrix = spark.read.option("uri",config("mongo.uri")).option("collection",MONGODB_PRODUCT_RECS_COLLECTION).format("com.mongodb.spark.sql").load().as[ProductRecs] .rdd.map{recs =>(recs.productId,recs.recs.map(x=> (x.productId,x.score)).toMap)}.collectAsMap() val simProductsMatrixBroadCast = sc.broadcast(simProductsMatrix)//创建到Kafka的连接val kafkaPara = Map("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")val kafkaStream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(config("kafka.topic")),kafkaPara))// UID|MID|SCORE|TIMESTAMP// 产生评分流val ratingStream = kafkaStream.map{case msg=>var attr = msg.value().split("\\|")(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)}// 核心实时推荐算法ratingStream.foreachRDD{rdd =>rdd.map{case (userId,productId,score,timestamp) =>println(">>>>>>>>>>>>>>>>")//获取当前最近的M次商品评分val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM,userId,ConnHelper.jedis)//获取商品P最相似的K个商品val simProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM,productId,userId,simProductsMatrixBroadCast.value)//计算待选商品的推荐优先级val streamRecs = computeProductScores(simProductsMatrixBroadCast.value,userRecentlyRatings,simProducts)//将数据保存到MongoDBsaveRecsToMongoDB(userId,streamRecs)}.count()}//启动Streaming程序ssc.start()ssc.awaitTermination()
}
5.3 实时推荐算法的实现
- 实时推荐算法的前提:
- 在Redis集群中存储了每一个用户最近对商品的K次评分。实时算法可以快速获取。
- 离线推荐算法已经将商品相似度矩阵提前计算到了MongoDB中。
- Kafka已经获取到了用户实时的评分数据。
- 算法过程如下:
- 实时推荐算法输入为一个评分<userId, productId, rate, timestamp>
- 执行的核心内容包括:
- 获取userId 最近K 次评分
- 获取productId 最相似K 个商品
- 计算候选商品的推荐优先级
- 更新对userId 的实时推荐结果
5.3.1 获取用户的K次最近评分
- 业务服务器在接收用户评分的时候,默认会将该评分情况以userId, productId, rate, timestamp的格式插入到Redis中该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可
5.3.2 获取当前商品最相似的K个商品
在离线算法中,已经预先将商品的相似度矩阵进行了计算,所以每个商品productId 的最相似的K 个商品很容易获取:从MongoDB中读取ProductRecs数据,从productId 在simHash 对应的子哈希表中获取相似度前K 大的那些商品。输出是数据类型为Array[Int]的数组,表示与productId 最相似的商品集合,并命名为candidateProducts 以作为候选商品集合。
5.3.3 商品推荐优先级计算
-
对于候选商品集合simiHash和userId 的最近K 个评分recentRatings,算法代码内容如下:
-
其中,getProductSimScore是取候选商品和已评分商品的相似度,代码如下:
-
而log是对数运算,这里实现为取10的对数(常用对数):
5.3.4 将结果保存到mongoDB
- saveRecsToMongoDB函数实现了结果的保存:
5.3.5 更新实时推荐结果
- 当计算出候选商品的推荐优先级的数组updatedRecommends<productId, E>后,这个数组将被发送到Web 后台服务器,与后台服务器上userId 的上次实时推荐结果recentRecommends<productId, E>进行合并、替换并选出优先级E 前K大的商品作为本次新的实时推荐。具体而言:
- 合并:将updatedRecommends 与recentRecommends 并集合成为一个新的<productId, E>数组;
- 替换(去重):当updatedRecommends 与recentRecommends 有重复的商品productId 时,recentRecommends 中productId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的productId 的推荐优先级;
- 选取TopK:在合并、替换后的<productId, E>数组上,根据每个product 的推荐优先级,选择出前K 大的商品,作为本次实时推荐的最终结果。
代码主体:
package com.atguigu.onlineimport com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis// 定义一个连接助手对象,建立到redis和mongodb的连接
object ConnHelper extends Serializable{// 懒变量定义,使用的时候才初始化lazy val jedis = new Jedis("localhost")lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
}case class MongoConfig( uri: String, db: String )// 定义标准推荐对象
case class Recommendation( productId: Int, score: Double )
// 定义用户的推荐列表
case class UserRecs( userId: Int, recs: Seq[Recommendation] )
// 定义商品相似度列表
case class ProductRecs( productId: Int, recs: Seq[Recommendation] )object OnlineRecommender {// 定义常量和表名val MONGODB_RATING_COLLECTION = "Rating"val STREAM_RECS = "StreamRecs"val PRODUCT_RECS = "ProductRecs"val MAX_USER_RATING_NUM = 20val MAX_SIM_PRODUCTS_NUM = 20def main(args: Array[String]): Unit = {val config = Map("spark.cores" -> "local[*]","mongo.uri" -> "mongodb://localhost:27017/recommender","mongo.db" -> "recommender","kafka.topic" -> "recommender")// 创建spark confval sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OnlineRecommender")val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sc = spark.sparkContextval ssc = new StreamingContext(sc, Seconds(2))import spark.implicits._implicit val mongoConfig = MongoConfig( config("mongo.uri"), config("mongo.db") )// 加载数据,相似度矩阵,广播出去val simProductsMatrix = spark.read.option("uri", mongoConfig.uri).option("collection", PRODUCT_RECS).format("com.mongodb.spark.sql").load().as[ProductRecs].rdd// 为了后续查询相似度方便,把数据转换成map形式.map{item =>( item.productId, item.recs.map( x=>(x.productId, x.score) ).toMap )}.collectAsMap()// 定义广播变量val simProcutsMatrixBC = sc.broadcast(simProductsMatrix)// 创建kafka配置参数val kafkaParam = Map("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "recommender","auto.offset.reset" -> "latest")// 创建一个DStreamval kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String]( Array(config("kafka.topic")), kafkaParam ))// 对kafkaStream进行处理,产生评分流,userId|productId|score|timestampval ratingStream = kafkaStream.map{msg=>var attr = msg.value().split("\\|")( attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt )}// 核心算法部分,定义评分流的处理流程ratingStream.foreachRDD{rdds => rdds.foreach{case ( userId, productId, score, timestamp ) =>println("rating data coming!>>>>>>>>>>>>>>>>>>")// TODO: 核心算法流程// 1. 从redis里取出当前用户的最近评分,保存成一个数组Array[(productId, score)]val userRecentlyRatings = getUserRecentlyRatings( MAX_USER_RATING_NUM, userId, ConnHelper.jedis )// 2. 从相似度矩阵中获取当前商品最相似的商品列表,作为备选列表,保存成一个数组Array[productId]val candidateProducts = getTopSimProducts( MAX_SIM_PRODUCTS_NUM, productId, userId, simProcutsMatrixBC.value )// 3. 计算每个备选商品的推荐优先级,得到当前用户的实时推荐列表,保存成 Array[(productId, score)]val streamRecs = computeProductScore( candidateProducts, userRecentlyRatings, simProcutsMatrixBC.value )// 4. 把推荐列表保存到mongodbsaveDataToMongoDB( userId, streamRecs )}}// 启动streamingssc.start()println("streaming started!")ssc.awaitTermination()}/*** 从redis里获取最近num次评分*/import scala.collection.JavaConversions._def getUserRecentlyRatings(num: Int, userId: Int, jedis: Jedis): Array[(Int, Double)] = {// 从redis中用户的评分队列里获取评分数据,list键名为uid:USERID,值格式是 PRODUCTID:SCOREjedis.lrange( "userId:" + userId.toString, 0, num ).map{ item =>val attr = item.split("\\:")( attr(0).trim.toInt, attr(1).trim.toDouble )}.toArray}// 获取当前商品的相似列表,并过滤掉用户已经评分过的,作为备选列表def getTopSimProducts(num: Int,productId: Int,userId: Int,simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])(implicit mongoConfig: MongoConfig): Array[Int] ={// 从广播变量相似度矩阵中拿到当前商品的相似度列表val allSimProducts = simProducts(productId).toArray// 获得用户已经评分过的商品,过滤掉,排序输出val ratingCollection = ConnHelper.mongoClient( mongoConfig.db )( MONGODB_RATING_COLLECTION )val ratingExist = ratingCollection.find( MongoDBObject("userId"->userId) ).toArray.map{item=> // 只需要productIditem.get("productId").toString.toInt}// 从所有的相似商品中进行过滤allSimProducts.filter( x => ! ratingExist.contains(x._1) ).sortWith(_._2 > _._2).take(num).map(x=>x._1)}// 计算每个备选商品的推荐得分def computeProductScore(candidateProducts: Array[Int],userRecentlyRatings: Array[(Int, Double)],simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] ={// 定义一个长度可变数组ArrayBuffer,用于保存每一个备选商品的基础得分,(productId, score)val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()// 定义两个map,用于保存每个商品的高分和低分的计数器,productId -> countval increMap = scala.collection.mutable.HashMap[Int, Int]()val decreMap = scala.collection.mutable.HashMap[Int, Int]()// 遍历每个备选商品,计算和已评分商品的相似度for( candidateProduct <- candidateProducts; userRecentlyRating <- userRecentlyRatings ){// 从相似度矩阵中获取当前备选商品和当前已评分商品间的相似度val simScore = getProductsSimScore( candidateProduct, userRecentlyRating._1, simProducts )if( simScore > 0.4 ){// 按照公式进行加权计算,得到基础评分scores += ( (candidateProduct, simScore * userRecentlyRating._2) )if( userRecentlyRating._2 > 3 ){increMap(candidateProduct) = increMap.getOrDefault(candidateProduct, 0) + 1} else {decreMap(candidateProduct) = decreMap.getOrDefault(candidateProduct, 0) + 1}}}// 根据公式计算所有的推荐优先级,首先以productId做groupbyscores.groupBy(_._1).map{case (productId, scoreList) =>( productId, scoreList.map(_._2).sum/scoreList.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)) )}// 返回推荐列表,按照得分排序.toArray.sortWith(_._2>_._2)}def getProductsSimScore(product1: Int, product2: Int,simProducts: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double ={simProducts.get(product1) match {case Some(sims) => sims.get(product2) match {case Some(score) => scorecase None => 0.0}case None => 0.0}}// 自定义log函数,以N为底def log(m: Int): Double = {val N = 10math.log(m)/math.log(N)}// 写入mongodbdef saveDataToMongoDB(userId: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(STREAM_RECS)// 按照userId查询并更新streamRecsCollection.findAndRemove( MongoDBObject( "userId" -> userId ) )streamRecsCollection.insert( MongoDBObject( "userId" -> userId,"recs" -> streamRecs.map(x=>MongoDBObject("productId"->x._1, "score"->x._2)) ) )}}
5.4 实时系统联调
- 我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。
5.4.1 启动实时系统的基本组件
- 启动实时推荐系统StreamingRecommender以及mongodb、redis
5.4.2 启动zookeeper
bin/zkServer.sh start
5.4.3 启动kafka
bin/kafka-server-start.sh -daemon ./config/server.properties
5.4.4 构建Kafka Streaming程序
- 在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:
- 在src/main/java下新建java类com.atguigu.kafkastreaming.Application
public class Application {public static void main(String[] args){String brokers = "localhost:9092";String zookeepers = "localhost:2181";// 定义输入和输出的topicString from = "log";String to = "recommender";// 定义kafka streaming的配置Properties settings = new Properties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);StreamsConfig config = new StreamsConfig(settings);// 拓扑建构器TopologyBuilder builder = new TopologyBuilder();// 定义流处理的拓扑结构builder.addSource("SOURCE", from).addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE").addSink("SINK", to, "PROCESS");KafkaStreams streams = new KafkaStreams(builder, config);streams.start();}
}
- 这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。
- 流处理程序 LogProcess.java
public class LogProcessor implements Processor<byte[],byte[]> {private ProcessorContext context;public void init(ProcessorContext context) {this.context = context;}public void process(byte[] dummy, byte[] line) {String input = new String(line);// 根据前缀过滤日志信息,提取后面的内容if(input.contains("PRODUCT_RATING_PREFIX:")){System.out.println("product rating coming!!!!" + input);input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();context.forward("logProcessor".getBytes(), input.getBytes());}}public void punctuate(long timestamp) {}public void close() {}
}
- 完成代码后,启动Application。
5.4.5 配置并启动flume
- 在flume的conf目录下新建log-kafka.properties,对flume连接kafka做配置:
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
# 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
agent.sources.exectail.command = tail –f
/mnt/d/Projects/BigData/ECommerceRecommenderSystem/businessServer/src/main/log/agent.log
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = localhost:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel# Each channel's type is defined.
agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
配置好后,启动flume:
./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
5.4.6 启动业务系统后台
- 将业务代码加入系统中。注意在src/main/resources/ 下的 log4j.properties中,log4j.appender.file.File的值应该替换为自己的日志目录,与flume中的配置应该相同。
- 启动业务系统后台,访问localhost:8088/index.html;点击某个商品进行评分,查看实时推荐列表是否会发生变化。
第6章 冷启动问题处理
- 整个推荐系统更多的是依赖于用于的偏好信息进行商品的推荐,那么就会存在一个问题,对于新注册的用户是没有任何偏好信息记录的,那这个时候推荐就会出现问题,导致没有任何推荐的项目出现。
- 处理这个问题一般是通过当用户首次登陆时,为用户提供交互式的窗口来获取用户对于物品的偏好,让用户勾选预设的兴趣标签。
当获取用户的偏好之后,就可以直接给出相应类型商品的推荐。
第7章 其它形式的离线相似推荐服务
7.1 基于内容的相似推荐
- 原始数据中的tag文件,是用户给商品打上的标签,这部分内容想要直接转成评分并不容易,不过我们可以将标签内容进行提取,得到商品的内容特征向量,进而可以通过求取相似度矩阵。这部分可以与实时推荐系统直接对接,计算出与用户当前评分商品的相似商品,实现基于内容的实时推荐。为了避免热门标签对特征提取的影响,我们还可以通过TF-IDF算法对标签的权重进行调整,从而尽可能地接近用户偏好。
- 基于以上思想,加入TF-IDF算法的求取商品特征向量的核心代码如下:
// 载入商品数据集
val productTagsDF = spark.read.option("uri",mongoConfig.uri).option("collection",MONGODB_PRODUCT_COLLECTION).format("com.mongodb.spark.sql").load().as[Product].map(x => (x.productId, x.name, x.genres.map(c => if(c == '|') ' ' else c))).toDF("productId", "name", "tags").cache()// 实例化一个分词器,默认按空格分
val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")// 用分词器做转换
val wordsData = tokenizer.transform(productTagsDF)// 定义一个HashingTF工具
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(200)// 用 HashingTF 做处理
val featurizedData = hashingTF.transform(wordsData)// 定义一个IDF工具
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")// 将词频数据传入,得到idf模型(统计文档)
val idfModel = idf.fit(featurizedData)// 用tf-idf算法得到新的特征矩阵
val rescaledData = idfModel.transform(featurizedData)// 从计算得到的 rescaledData 中提取特征向量
val productFeatures = rescaledData.map{case row => ( row.getAs[Int]("productId"),row.getAs[SparseVector]("features").toArray )
}.rdd.map(x => {(x._1, new DoubleMatrix(x._2) )})
- 然后通过商品特征向量进而求出相似度矩阵,就可以在商品详情页给出相似推荐了;通常在电商网站中,用户浏览商品或者购买完成之后,都会显示类似的推荐列表。
- 得到的相似度矩阵也可以为实时推荐提供基础,得到用户推荐列表。可以看出,基于内容和基于隐语义模型,目的都是为了提取出物品的特征向量,从而可以计算出相似度矩阵。而我们的实时推荐系统算法正是基于相似度来定义的。
7.2 基于物品的协同过滤相似推荐
- 基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广。
- 我们的整体思想是,如果两个商品有同样的受众(感兴趣的人群),那么它们就是有内在相关性的。所以可以利用已有的行为数据,分析商品受众的相似程度,进而得出商品间的相似度。我们把这种方法定义为物品的“同现相似度”,公式如下:
- 其中,Ni 是购买商品 i (或对商品 i 评分)的用户列表,Nj 是购买商品 j 的用户列表。
- 核心代码实现如下:
val ratingDF = spark.read.option("uri", mongoConfig.uri).option("collection", MONGODB_RATING_COLLECTION).format("com.mongodb.spark.sql").load().as[Rating].map(x=> (x.userId, x.productId, x.score) ).toDF("userId", "productId", "rating")// 统计每个商品的评分个数,并通过内连接添加到 ratingDF 中val numRatersPerProduct = ratingDF.groupBy("productId").count()val ratingWithCountDF = ratingDF.join(numRatersPerProduct, "productId")// 将商品评分按 userId 两两配对,可以统计两个商品被同一用户做出评分的次数val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId").toDF("userId", "product1", "rating1", "count1", "product2", "rating2", "count2").select("userId", "product1", "count1", "product2", "count2")joinedDF.createOrReplaceTempView("joined")val cooccurrenceDF = spark.sql("""|select product1|, product2|, count(userId) as coocount|, first(count1) as count1|, first(count2) as count2|from joined|group by product1, product2""".stripMargin).cache()val simDF = cooccurrenceDF.map{ row =>// 用同现的次数和各自的次数,计算同现相似度val coocSim = cooccurrenceSim( row.getAs[Long]("coocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )( row.getAs[Int]("product1"), ( row.getAs[Int]("product2"), coocSim ) )}.rdd.groupByKey().map{case (productId, recs) =>ProductRecs( productId,recs.toList.filter(x=>x._1 != productId).sortWith(_._2>_._2).map(x=>Recommendation(x._1,x._2)).take(MAX_RECOMMENDATION))}.toDF()
- 其中,计算同现相似度的函数代码实现如下:
def cooccurrenceSim(cooCount: Long, count1: Long, count2: Long): Double ={cooCount / math.sqrt( count1 * count2 )}
第8章 程序部署与运行
8.1 发布项目
- 编译项目:执行root项目的clean package阶段
8.2 安装前端项目
- 将website-release.tar.gz解压到/var/www/html目录下,将里面的文件放在根目录
- 启动Apache服务器,访问http://IP:80
8.3 安装业务服务器
- 将BusinessServer.war,放到tomcat的webapp目录下,并将解压出来的文件,放到ROOT目录下:
- 启动Tomcat服务器
8.4 Kafka配置与启动
- 启动Kafka
- 在kafka中创建两个Topic,一个为log,一个为recommender
- 启动kafkaStream程序,用于在log和recommender两个topic之间进行数据格式化。
java -cp kafkastream.jar com.atguigu.kafkastream.Application linux:9092 linux:2181 log recommender
8.5 Flume配置与启动
- 在flume安装目录下的conf文件夹下,创建log-kafka.properties
agent.sources = exectail
agent.channels = memoryChannel
agent.sinks = kafkasink# For each one of the sources, the type is defined
agent.sources.exectail.type = exec
agent.sources.exectail.command = tail -f /home/bigdata/cluster/apache-tomcat-8.5.23/logs/catalina.out
agent.sources.exectail.interceptors=i1
agent.sources.exectail.interceptors.i1.type=regex_filter
agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
# The channel can be defined as follows.
agent.sources.exectail.channels = memoryChannel# Each sink's type must be defined
agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkasink.kafka.topic = log
agent.sinks.kafkasink.kafka.bootstrap.servers = linux:9092
agent.sinks.kafkasink.kafka.producer.acks = 1
agent.sinks.kafkasink.kafka.flumeBatchSize = 20#Specify the channel the sink should use
agent.sinks.kafkasink.channel = memoryChannel# Each channel's type is defined.
agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 10000
- 启动flume
bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent
8.6 部署流式计算服务
- 提交SparkStreaming程序:
bin/spark-submit --class com.atguigu.streamingRecommender.StreamingRecommender streamingRecommender-1.0-SNAPSHOT.jar
8.7 Azkaban调度离线算法
- 创建调度项目
- 创建两个job文件:
- Azkaban-stat.job:
type=command
command=/home/bigdata/cluster/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.offline.RecommenderTrainerAppofflineRecommender-1.0-SNAPSHOT.jar
- Azkaban-offline.job:
type=command
command=/home/bigdata/cluster/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.statisticsRecommender.StatisticsAppstatisticsRecommender-1.0-SNAPSHOT.jar
- 将Job文件打成ZIP包上传到azkaban:
- 分别为每一个任务定义指定的时间
- 定义完成之后,点击Scheduler即可。