SparkSQL综合案例-省份维度的销售情况统计分析

一、项目背景

二、项目需求

        (1)需求

        ①各省销售指标,每个省份的销售额统计

        ②TOP3销售省份中,有多少家店铺日均销售额1000+

        ③TOP3省份中,各个省份的平均单价

        ④TOP3省份中,各个省份的支付类型比例

        (2)要求

        ①将需求结果写出到mysql

        ②将数据写入到Spark On Hive中

三、代码实现

        (1)需求1:
# cording:utf8
'''
要求1:各省销售额统计
要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
要求3:TOP3省份中,各省的平均单单价
要求4:TOP3省份中,各个省份的支付类型比例
'''from pyspark.sql import SparkSession
from pyspark.sql import functions as F
if __name__ == '__main__':spark = SparkSession.builder.appName('SQL_text').\master('local[*]').\config('spark.sql.shuffle.partitions', '2').\config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://pyspark01:9083').\enableHiveSupport().\getOrCreate()# 1.读取数据# 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串# 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据# 列值过滤(SparkSQL会自动做这个优化)df = spark.read.format('json').load('../../input/mini.json').\dropna(thresh=1, subset=['storeProvince']).\filter("storeProvince != 'null'").\filter('receivable < 10000').\select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')# TODO 1:各省销售额统计province_sale_df = df.groupBy('storeProvince').sum('receivable').\withColumnRenamed('sum(receivable)', 'money').\withColumn('money', F.round('money', 2)).\orderBy('money', ascending=False)province_sale_df.show(truncate=False)# 写出到Mysqlprovince_sale_df.write.mode('overwrite').\format('jdbc').\option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\option('dbtable', 'province_sale').\option('user', 'root').\option('password', 'root').\option('encoding', 'utf8').\save()# 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# 会将表写入到hive的数据仓库中province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')

        结果展示:

        MySQL数据展示:

        Hive数据展示:

        (2)需求2:
# cording:utf8
'''
要求1:各省销售额统计
要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
要求3:TOP3省份中,各省的平均单单价
要求4:TOP3省份中,各个省份的支付类型比例
'''from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':spark = SparkSession.builder.appName('SQL_text').\master('local[*]').\config('spark.sql.shuffle.partitions', '2').\config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://pyspark01:9083').\enableHiveSupport().\getOrCreate()# 1.读取数据# 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串# 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据# 列值过滤(SparkSQL会自动做这个优化)df = spark.read.format('json').load('../../input/mini.json').\dropna(thresh=1, subset=['storeProvince']).\filter("storeProvince != 'null'").\filter('receivable < 10000').\select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')# TODO 1:各省销售额统计province_sale_df = df.groupBy('storeProvince').sum('receivable').\withColumnRenamed('sum(receivable)', 'money').\withColumn('money', F.round('money', 2)).\orderBy('money', ascending=False)# # 写出到Mysql# province_sale_df.write.mode('overwrite').\#     format('jdbc').\#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\#     option('dbtable', 'province_sale').\#     option('user', 'root').\#     option('password', 'root').\#     option('encoding', 'utf8').\#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')# TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+# 2.1 找到TOP3的销售省份top3_province_df = province_sale_df.limit(3).select('storeProvince').\withColumnRenamed('storeProvince', 'top3_province')     # 对列名进行重命名,防止与province_sale_df的storeProvince冲突# 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])# 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)# from_unixtime将秒级的日期数据转换为年月日数据# from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\sum('receivable').withColumnRenamed('sum(receivable)', 'money').\filter('money > 1000 ').\dropDuplicates(subset=['storeID']).\groupBy('storeProvince').count()province_hot_store_count_df.show()# 写出到Mysqlprovince_sale_df.write.mode('overwrite'). \format('jdbc'). \option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \option('dbtable', 'province_hot_store_count'). \option('user', 'root'). \option('password', 'root'). \option('encoding', 'utf8'). \save()# 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# 会将表写入到hive的数据仓库中province_sale_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')top3_province_df_joined.unpersist()

        结果展示:

        MySQL结果展示:

        Hive结果展示:

        (3)需求3:
# cording:utf8
'''
要求1:各省销售额统计
要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
要求3:TOP3省份中,各省的平均单单价
要求4:TOP3省份中,各个省份的支付类型比例
'''from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevelif __name__ == '__main__':spark = SparkSession.builder.appName('SQL_text').\master('local[*]').\config('spark.sql.shuffle.partitions', '2').\config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://pyspark01:9083').\enableHiveSupport().\getOrCreate()# 1.读取数据# 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串# 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据# 列值过滤(SparkSQL会自动做这个优化)df = spark.read.format('json').load('../../input/mini.json').\dropna(thresh=1, subset=['storeProvince']).\filter("storeProvince != 'null'").\filter('receivable < 10000').\select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')# TODO 1:各省销售额统计province_sale_df = df.groupBy('storeProvince').sum('receivable').\withColumnRenamed('sum(receivable)', 'money').\withColumn('money', F.round('money', 2)).\orderBy('money', ascending=False)# # 写出到Mysql# province_sale_df.write.mode('overwrite').\#     format('jdbc').\#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\#     option('dbtable', 'province_sale').\#     option('user', 'root').\#     option('password', 'root').\#     option('encoding', 'utf8').\#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')# TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+# 2.1 找到TOP3的销售省份top3_province_df = province_sale_df.limit(3).select('storeProvince').\withColumnRenamed('storeProvince', 'top3_province')     # 对列名进行重命名,防止与province_sale_df的storeProvince冲突# 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])# 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)# from_unixtime将秒级的日期数据转换为年月日数据# from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\sum('receivable').withColumnRenamed('sum(receivable)', 'money').\filter('money > 1000 ').\dropDuplicates(subset=['storeID']).\groupBy('storeProvince').count()province_hot_store_count_df.show()# # 写出到Mysql# province_hot_store_count_df.write.mode('overwrite'). \#     format('jdbc'). \#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \#     option('dbtable', 'province_hot_store_count'). \#     option('user', 'root'). \#     option('password', 'root'). \#     option('encoding', 'utf8'). \#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# province_hot_store_count_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')# TODO 3:TOP3省份中,各省的平均单单价top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\avg("receivable").\withColumnRenamed("avg(receivable)", "money").\withColumn("money", F.round("money", 2)).\orderBy("money", ascending=False)top3_province_order_avg_df.show(truncate=False)# 写出到Mysqltop3_province_order_avg_df.write.mode('overwrite'). \format('jdbc'). \option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \option('dbtable', 'top3_province_order_avg'). \option('user', 'root'). \option('password', 'root'). \option('encoding', 'utf8'). \save()# 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# 会将表写入到hive的数据仓库中top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.top3_province_order_avg', 'parquet')top3_province_df_joined.unpersist()

        结果展示

        MySQL与Hive结果展示:

        (4)需求4:
# cording:utf8
'''
要求1:各省销售额统计
要求2:TOP3销售省份中,有多少店铺达到过日销售额1000+
要求3:TOP3省份中,各省的平均单单价
要求4:TOP3省份中,各个省份的支付类型比例
'''from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StringType
if __name__ == '__main__':spark = SparkSession.builder.appName('SQL_text').\master('local[*]').\config('spark.sql.shuffle.partitions', '2').\config('spark.sql.warehouse.dir', 'hdfs://pyspark01/user/hive/warehouse').\config('hive.metastore.uris', 'thrift://pyspark01:9083').\enableHiveSupport().\getOrCreate()# 1.读取数据# 省份信息,缺失值过滤,同时省份信息中会有‘null’字符串# 订单的金额,数据集中有订单的金额是单笔超过10000的,这些事测试数据# 列值过滤(SparkSQL会自动做这个优化)df = spark.read.format('json').load('../../input/mini.json').\dropna(thresh=1, subset=['storeProvince']).\filter("storeProvince != 'null'").\filter('receivable < 10000').\select('storeProvince', 'storeID', 'receivable', 'dateTS', 'payType')# TODO 1:各省销售额统计province_sale_df = df.groupBy('storeProvince').sum('receivable').\withColumnRenamed('sum(receivable)', 'money').\withColumn('money', F.round('money', 2)).\orderBy('money', ascending=False)# # 写出到Mysql# province_sale_df.write.mode('overwrite').\#     format('jdbc').\#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8').\#     option('dbtable', 'province_sale').\#     option('user', 'root').\#     option('password', 'root').\#     option('encoding', 'utf8').\#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# province_sale_df.write.mode('overwrite').saveAsTable('default.province_sale', 'parquet')# TODO 需求2:TOP3销售省份中,有多少店铺达到过日销售额1000+# 2.1 找到TOP3的销售省份top3_province_df = province_sale_df.limit(3).select('storeProvince').\withColumnRenamed('storeProvince', 'top3_province')     # 对列名进行重命名,防止与province_sale_df的storeProvince冲突# 2.2 和原始的DF进行内关联,数据关联后,得到TOP3省份的销售数据top3_province_df_joined = df.join(top3_province_df, on=df['storeProvince'] == top3_province_df['top3_province'])# 因为需要多次使用到TOP3省份数据,所有对其进行持久化缓存top3_province_df_joined.persist(StorageLevel.MEMORY_AND_DISK)# from_unixtime将秒级的日期数据转换为年月日数据# from_unixtime的精度是秒级,数据的精度是毫秒级,需要对数据进行进度的裁剪province_hot_store_count_df = top3_province_df_joined.groupBy("storeProvince", "storeID",F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-mm-dd").alias('day')).\sum('receivable').withColumnRenamed('sum(receivable)', 'money').\filter('money > 1000 ').\dropDuplicates(subset=['storeID']).\groupBy('storeProvince').count()province_hot_store_count_df.show()# # 写出到Mysql# province_hot_store_count_df.write.mode('overwrite'). \#     format('jdbc'). \#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \#     option('dbtable', 'province_hot_store_count'). \#     option('user', 'root'). \#     option('password', 'root'). \#     option('encoding', 'utf8'). \#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# province_hot_store_count_df.write.mode('overwrite').saveAsTable('default.province_hot_store_count', 'parquet')# TODO 3:TOP3省份中,各省的平均单单价top3_province_order_avg_df = top3_province_df_joined.groupBy("storeProvince").\avg("receivable").\withColumnRenamed("avg(receivable)", "money").\withColumn("money", F.round("money", 2)).\orderBy("money", ascending=False)top3_province_order_avg_df.show(truncate=False)# # 写出到Mysql# top3_province_order_avg_df.write.mode('overwrite'). \#     format('jdbc'). \#     option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \#     option('dbtable', 'top3_province_order_avg'). \#     option('user', 'root'). \#     option('password', 'root'). \#     option('encoding', 'utf8'). \#     save()## # 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# # 会将表写入到hive的数据仓库中# top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.top3_province_order_avg', 'parquet')# TODO 4:TOP3省份中,各个省份的支付类型比例top3_province_df_joined.createTempView("province_pay")# 自定义UDFdef udf_func(percent):return str(round(percent * 100)) + "%"# 注册UDFmy_udf = F.udf(udf_func, StringType())pay_type_df = spark.sql('''SELECT storeProvince, payType, (count(payType) / total) AS percent FROM (SELECT storeProvince, payType, count(1) OVER(PARTITION BY storeProvince) AS total FROM province_pay) AS subGROUP BY storeProvince, payType, total''').withColumn('percent', my_udf("percent"))pay_type_df.show()# 写出到Mysqlpay_type_df.write.mode('overwrite'). \format('jdbc'). \option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=False&useUnicode=true&characterEncoding=utf8'). \option('dbtable', 'pay_type'). \option('user', 'root'). \option('password', 'root'). \option('encoding', 'utf8'). \save()# 写出Hive表 saveAsTable 可以写出表 要求已经配置好spark on hive# 会将表写入到hive的数据仓库中top3_province_order_avg_df.write.mode('overwrite').saveAsTable('default.pay_type', 'parquet')top3_province_df_joined.unpersist()

       结果展示:

       MySQL结果展示:

        Hive结果展示:

四、项目运行问题及解决方法

        报错:java.sql.BatchUpdateException: Incorrect string value: '\xE6\xB1\x9F\xE8\xA5\xBF...' for column 'storeProvince' atrow1

        原因:MySQL的UTF-8只支持3个字节的unicode字符,无法支持四个字节的Unicode字符

        解决办法:在MySQL控制台执行下列代码修改编码格式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/151851.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apollo 快速上手指南:打造自动驾驶解决方案

快速上手 概述云端体验登录云端仿真环境 打开DreamView播放离线数据包PNC Monitor 内置的数据监视器cyber_monitor 实时通道信息视图福利活动 主页传送门&#xff1a;&#x1f4c0; 传送 概述 Apollo 开放平台是一个开放的、完整的、安全的平台&#xff0c;将帮助汽车行业及自…

Go学习第十三章——Gin(入门与路由)

Go web框架——Gin&#xff08;入门与路由&#xff09; 1 Gin框架介绍1.1 基础介绍1.2 安装Gin1.3 快速使用 2 路由2.1 基本路由GET请求POST请求 2.2 路由参数2.3 路由分组基本分组带中间件的分组 2.4 重定向 1 Gin框架介绍 github链接&#xff1a;https://github.com/gin-gon…

如何和安装Windows10系统教程(最新最详细)

目录 一.简介 二.安装步骤 软件&#xff1a;Windows 10版本&#xff1a;1909语言&#xff1a;简体中文大小&#xff1a;4.95G安装环境&#xff1a;Win10/Win8/Win7(64位&#xff09;硬件要求&#xff1a;CPU2.0GHz 内存4G(或更高&#xff09;下载通道①丨百度网盘&#xff1a…

【蓝桥每日一题]-前缀和与差分(保姆级教程 篇2)#差分序列

昨天讲的概念和模板&#xff0c;今天讲一个差分序列的好题(好好体会里面的优化思想)&#xff1a; 目录 题目&#xff1a; 思路&#xff1a; 题目&#xff1a; 手动打出样例哈 输入&#xff1a; 输出&#xff1a; 4 …

MySQL -- 表的约束

MySQL – 表的约束 文章目录 MySQL -- 表的约束一、表的约束1.空属性2.默认值3.列描述4.zerofill5.主键6.自增长7.唯一键8.外键 一、表的约束 真正约束字段的是数据类型&#xff0c;但是数据类型约束很单一&#xff0c;需要有一些额外的约束&#xff0c;更好的保证数据的合 法…

Python 编写确定个位、十位以上方法及各数位的和程序

Python 编写确定数字位方法 Python 编写确定个位、十位Python 编写确定个位、十位、百位方法解析&#xff1a;Python 各数位的和程序 利用%&#xff08;取余符号&#xff09;、//&#xff08;整除&#xff09;符号。 Python 编写确定个位、十位 num 17 a num % 10 b num /…

pytorch 入门 (五)案例三:乳腺癌识别识别-VGG16实现

本文为&#x1f517;小白入门Pytorch内部限免文章 &#x1f368; 本文为&#x1f517;小白入门Pytorch中的学习记录博客&#x1f366; 参考文章&#xff1a;【小白入门Pytorch】乳腺癌识别&#x1f356; 原作者&#xff1a;K同学啊 在本案例中&#xff0c;我将带大家探索一下深…

2023年正版win10/win11系统安装教学(纯净版)

第一步&#xff1a;准备一个8G容量以上的U盘。 注意&#xff0c;在制作系统盘时会格式化U盘&#xff0c;所以最好准备个空U盘&#xff0c;防止资料丢失。 第二步&#xff1a;制作系统盘。 安装win10 进入windows官网 官网win10下载地址&#xff1a;https://www.microsoft.c…

OpenCV学习(六)——图像算术运算(加法、融合与按位运算)

图像算术运算 6. 图像算术运算6.1 图像加法6.2 图像融合6.3 按位运算 6. 图像算术运算 6.1 图像加法 OpenCV加法是饱和运算Numpy加法是模运算 import cv2 import numpy as npx np.uint8([250]) y np.uint8([10])# OpenCV加法 print(cv2.add(x, y)) # 25010 260 > 255…

基于springboot零食商城管理系统

功能如图所示 摘要 这基于Spring Boot的零食商城管理系统提供了强大的购物车和订单管理功能。用户可以在系统中浏览零食产品&#xff0c;并将它们添加到购物车中。购物车可以保存用户的选购商品&#xff0c;允许随时查看已选择的商品和它们的数量。一旦用户满意&#xff0c;他们…

并发编程 - 并发可见性,原子性,有序性 与 JMM内存模型

1. 并发三大特性 并发编程Bug的源头&#xff1a; 原子性 、 可见性 和 有序性 问题 1.1 原子性 一个或多个操作&#xff0c;要么全部执行且在执行过程中不被任何因素打断&#xff0c;要么全部不执行。 在 Java 中&#xff0c;对基本数据类型的变量的读取和赋值操作是原子性操…