写了一些使用sparksql以及spark机器学习来进行数据分析的东西,希望能给大家做一些参考
项目需求:对某大型商超客户采购数据集进行数据分析
数据来源:https://www.heywhale.com/mw/dataset/656069b19a74cc18269207c4/content
首先使用Spark读入数据集,读入文件前要先将文件转为csv格式
val sparConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparConf) //建立Spark连接
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.option("header", "true").csv("E:\\ShuJu\\sparkinput\\SuperStore\\SuperStore.csv") //读入文件
总体销售情况
df.agg(count("Order ID"), sum("Quantity"), sum("Sales"), sum("Profit")).show()
//对数据进行一次总体统计
5.1万个订单,销售商品总数为19.8万件,销售额1261.8万刀,利润146.7万刀
对各个地区市场的销售情况进行统计
df.groupBy("Market").agg(count("Order ID"), sum("Quantity"), sum("Sales"), sum("Profit")).show()
//对各个地区市场的销售情况进行统计
按照国家进行分组,按照订单个数进行递减排序,获取订单量前10的国家
val TopCountry = df.groupBy("Country").count() //提取出国家-订单个数的表格
TopCountry.sort(TopCountry("count").desc).show(10) //输出订单个数前10的国家
从图中可以看出美国对该商超的需求比较大,断层领先的第一名,需要优先对其客户进行关注。
对订单数排名前五的国家进行一次统计
val sumOrder = df.count() //sumOrder为所有订单个数的总和
val sumResult = TopCountry.select("count") //sumResult为订单数前五名国家的订单数总和.filter("count > 2000").rdd.map(row => row(0).asInstanceOf[Long]).reduce((a, b) => a + b)
val headConsumer = 1.0*sumResult/sumOrder
println("订单数前五名的国家在所有订单数中的占比为"+headConsumer*100+"%")
前五名为美国、澳大利亚、法国、墨西哥、德国,在全球总订单数中占比为39.7%
对订单商品类别进行查看
df.groupBy("Category").count().show() //查看所有产品类别
df.groupBy("Sub-Category").count().show() //查看所有子类别
在该数据集中共有三种产品类别,其中“办公用品”的销量最高,共产生了3万件订单,其次是“科技产品”和“家具”,分别有1万件和9千件
对产品类别下面的子类别进行统计
val SubCategory = df.groupBy("Sub-Category").count() //提取出子类别-订单个数的表格
SubCategory.sort(SubCategory("count").desc).show(10) //查看子类别中,销量最高的前十名
从销量榜上来看,粘结剂,储物盒,艺术品,纸张,椅子,手机等具有很高的销量数据。
对每一个子类别的总销售额进行统计分析
val SubCategorySale = df.select("Sub-Category", "Sales", "Quantity").withColumn("ASale", df("Sales") * df("Quantity")) //计算每一个类别的总销售额.groupBy("Sub-Category") //以类别进行分组.sum("ASale") //提取出子类别-销售额的表格SubCategorySale.sort(SubCategorySale("sum(ASale)").desc) //对每一个子类别的总销售额进行排序.withColumnRenamed("sum(ASale)", "AllSale") //重命名列名.show(10) //查看子类别中,销量额最高的前十名
从图片中可以看出手机、椅子、复印机、书架、储物盒、家用电器、配件、机器、桌子、文件夹的销售额从高到低,占有前十名
机器学习部分
这里的数据使用了前一千条来进行测试
对销售额进行预测
数据处理
val cleanedData = df.na.fill(Map("Postal Code" -> "Unknown","Sales" -> "0","Profit" -> "0","Quantity" -> "0","Shipping Cost" -> "0","Discount" -> "0"))// 转换为数值类型val transformedData = cleanedData.withColumn("Sales", col("Sales").cast("Double")).withColumn("Profit", col("Profit").cast("Double")).withColumn("Quantity", col("Quantity").cast("Double")).withColumn("Shipping Cost", col("Shipping Cost").cast("Double")).withColumn("Discount", col("Discount").cast("Double"))// 检查转换后的数据类型transformedData.printSchema()// 删除包含 null 值的行val nonNullData = transformedData.na.drop(Seq("Quantity", "Shipping Cost", "Discount"))
回归预测
// 特征选择val featureCols = Array("Quantity", "Shipping Cost", "Discount")val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")val featureData = assembler.transform(nonNullData)// 划分数据集val Array(trainingData, testData) = featureData.randomSplit(Array(0.8, 0.2))// 训练模型val lr = new LinearRegression().setLabelCol("Sales").setFeaturesCol("features")val model = lr.fit(trainingData)// 进行预测并查看结果val predictions = model.transform(testData)predictions.select("features", "Sales", "prediction").show()// 评估模型val trainingSummary = model.summaryprintln(s"RMSE: ${trainingSummary.rootMeanSquaredError}")println(s"R2: ${trainingSummary.r2}")
sc.stop()//关闭Spark连接
手动关键词
Spark数据分析,Spark项目,Spark机器学习,sparksql,大数据