发布日期:2022-04-12
VIP内容
赛题模拟实现-离线数据统计
具体内容
使用Scala语言编写程序获取ODS层指定分区表中的数据进行清洗,并完成销量前5的商品统计、某月的总销售额统计、指定月份的销售额统计、各用户在线总时长统计,并将统计后的数据存入MySQL数据库中。
实现原理
数据仓库概念图:
实现过程
1) 数据清洗任务。
数据清洗包括:
- 去重
- 空值判断与填充
因本案例所选数据集为Kaggle上的数据集,无重复数据和缺失字段,所以不需要去重和空值处理,因此省略数据清洗一项。
如果大家希望学习数据清洗的方法,可以参考以下相关内容:
- 示例_数据缺失值处理;
- 示例_数据整合、清洗与转换;
- 小白学苑案例双十一美妆数据分析”中的数据清洗部分实现。
2) 数据转换
数据转换
- 某月的总销售额统计;某几个月的销售额统计;
- 增加一个order_purchase_month列,以便按月统计;
- 增加一个销售金额列,它是一个计算列,通常是商品项的单价*数量。不过本例没有数量。我们姑且计算价格+运费吧。
数据整合:销量前5的商品统计:商品在商品维表,销量在订单项表中。对两张表执行join。
整理的结果,写出到Hive数仓的DWS层。
实现代码如下:
package com.bigdataplay2021
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object TransformJob {
def main(args: Array[String]): Unit = {
// 在windows下开发时设置
System.setProperty("HADOOP_USER_NAME", "hduser")
// 创建SparkSession
val spark = SparkSession.builder()
// .master("local[*]")
.appName("bigdata play")
.enableHiveSupport()
.getOrCreate()
// 打开Hive动态分区的标志
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// 数据转换和整合
transformToDWS(spark)
}
// 数据转换和整合
def transformToDWS(spark: SparkSession) = {
// 从ODS表加载订单表到DataFrame
val ordersDF = spark.table("olistdb.ods_orders")
// 从ODS表加载订单项表到DataFrame
val orderItemsDF = spark.table("olistdb.ods_order_items")
// 先join订单表和订单明细表
val ordersAndItemsDF = ordersDF.join(orderItemsDF,"order_id")
import spark.implicits._
// 增加order_purchase_month列和amount列
val ordersAndItemsDF2 = ordersAndItemsDF.
withColumn("order_purchase_month", date_format($"order_purchase_timestamp","yyyy-MM")).
withColumn("amount", $"price" + $"freight_value")
// 将整理后的DataFrame写回Hive的DWS层,并按order_purchase_month列分区存储
EtlUtil.loadToHive(spark,ordersAndItemsDF2,"olistdb","dws_ordersAndItems",Some("order_purchase_month"))
// 测试写入
spark.table("olistdb.dws_ordersAndItems").show(5)
}
}
3) 数据统计。
要求:
- 1) 销量前5的商品统计,并将统计后的数据存入MySQL数据库中。
- 2) 某月的总销售额统计,并将统计后的数据存入MySQL数据库中。
- 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。/li>
- 4) 各用户在线总时长统计,并将统计后的数据存入MySQL数据库中。
实现代码:
package com.bigdataplay2021
import org.apache.spark.sql._
import org.apache.spark.sql.functions.broadcast
object AnalysisJob {
def main(args: Array[String]): Unit = {
// 在windows下开发时设置
System.setProperty("HADOOP_USER_NAME", "hduser")
// SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("bigdata play")
.getOrCreate()
/* 从Hive表读取数据 */
// 首先加载dws表
val df = spark.table("olistdb.dws_ordersanditems")
/* 注册为临时视图 */
df.createOrReplaceTempView("orders")
// 1)统计销量前5的商品并存入MySQL
top5Items(spark)
// 2) 某月的总销售额统计。例如,统计2017年12月份的总销售额
SalesAmountByMonth(spark)
// 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。
// 例如,统计2017年10-12月份的总销售额
SalesAmountByMonths(spark)
}
// 1) 销量前5的商品统计
def top5Items(spark: SparkSession) = {
// 分析:按商品id分组统计金额
val top5 = spark.sql(
"""
|select product_id, count(1) as cnt
|from orders
|group by product_id
|order by cnt desc
|limit 5
|""".stripMargin)
// 如果简单要求,则直接将结果写出到mysql
// EtlUtil.exportToMysql(top5, "top5_tb")
// 如果还要求这5个商品的信息,则需要和维表join
import spark.implicits._
// 找出这5个商品的id
val top5_product_ids = top5.select("product_id").collect().map(_(0).toString)
// 执行维表关联
val top5_with_info = spark.
// 加载维表
table("olistdb.dim_products").
// 先从维表中把这5个id对应的商品信息找出来
where($"product_id".isin(top5_product_ids:_*)).
// 执行broadcast join
join(broadcast(top5), "product_id")
// 写出到mysql
EtlUtil.exportToMysql(top5_with_info, "top5_tb")
}
// 2) 某月的总销售额统计。例如,统计2017年12月份的总销售额
def SalesAmountByMonth(spark: SparkSession) = {
val amountOfMonth = spark.sql(
"""
|select order_purchase_month, sum(amount) as total_amount
|from orders
|group by order_purchase_month
|having order_purchase_month=="2017-12"
|""".stripMargin)
// 写出到mysql
EtlUtil.exportToMysql(amountOfMonth, "amount_of_month")
}
// 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。
// 例如,统计2017年10-12月份的总销售额
def SalesAmountByMonths(spark: SparkSession) = {
val amountOfMonths = spark.sql(
"""
|select sum(amount) as total_amount
|from orders
|where order_purchase_month in ("2017-10","2017-11","2017-12")
|""".stripMargin)
// 写出到mysql
EtlUtil.exportToMysql(amountOfMonths, "amount_of_months")
}
}
4) 项目打包。
将项目打包jar包以便部署。打包命令(在IDEA Terminal终端窗口中执行):
$ mvn clean package
5)项目部署
首先启动Hive Metastore服务,并保持运行(不要关闭):
$ hive --service metastore
将项目部署到Spark上运行,部署命令:
-- 先执行数据转换 $ spark-submit --class com.bigdataplay2021.TransformJob hellospark-1.0-SNAPSHOT.jar -- 再执行数据统计分析和导出 $ spark-submit --class com.bigdataplay2021.AnalysisJob hellospark-1.0-SNAPSHOT.jar