发布日期:2022-04-12 VIP内容

赛题模拟实现-离线数据统计

具体内容

使用Scala语言编写程序获取ODS层指定分区表中的数据进行清洗,并完成销量前5的商品统计、某月的总销售额统计、指定月份的销售额统计、各用户在线总时长统计,并将统计后的数据存入MySQL数据库中。

实现原理

数据仓库概念图:

实现过程

1) 数据清洗任务。

数据清洗包括:

  • 去重
  • 空值判断与填充

因本案例所选数据集为Kaggle上的数据集,无重复数据和缺失字段,所以不需要去重和空值处理,因此省略数据清洗一项。

如果大家希望学习数据清洗的方法,可以参考以下相关内容:

2) 数据转换

数据转换

  • 某月的总销售额统计;某几个月的销售额统计;
  • 增加一个order_purchase_month列,以便按月统计;
  • 增加一个销售金额列,它是一个计算列,通常是商品项的单价*数量。不过本例没有数量。我们姑且计算价格+运费吧。

数据整合:销量前5的商品统计:商品在商品维表,销量在订单项表中。对两张表执行join。

整理的结果,写出到Hive数仓的DWS层。

实现代码如下:

package com.bigdataplay2021

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object TransformJob {
  def main(args: Array[String]): Unit = {
    // 在windows下开发时设置
    System.setProperty("HADOOP_USER_NAME", "hduser")

    // 创建SparkSession
    val spark = SparkSession.builder()
      //      .master("local[*]")
      .appName("bigdata play")
      .enableHiveSupport()
      .getOrCreate()

    // 打开Hive动态分区的标志
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

    // 数据转换和整合
    transformToDWS(spark)
  }

  // 数据转换和整合
  def transformToDWS(spark: SparkSession) = {
    // 从ODS表加载订单表到DataFrame
    val ordersDF = spark.table("olistdb.ods_orders")

    // 从ODS表加载订单项表到DataFrame
    val orderItemsDF = spark.table("olistdb.ods_order_items")

    // 先join订单表和订单明细表
    val ordersAndItemsDF = ordersDF.join(orderItemsDF,"order_id")

    import spark.implicits._

    // 增加order_purchase_month列和amount列
    val ordersAndItemsDF2 = ordersAndItemsDF.
      withColumn("order_purchase_month", date_format($"order_purchase_timestamp","yyyy-MM")).
      withColumn("amount", $"price" + $"freight_value")

    // 将整理后的DataFrame写回Hive的DWS层,并按order_purchase_month列分区存储
    EtlUtil.loadToHive(spark,ordersAndItemsDF2,"olistdb","dws_ordersAndItems",Some("order_purchase_month"))

    // 测试写入
    spark.table("olistdb.dws_ordersAndItems").show(5)
  }
}

3) 数据统计。

要求:

  • 1) 销量前5的商品统计,并将统计后的数据存入MySQL数据库中。
  • 2) 某月的总销售额统计,并将统计后的数据存入MySQL数据库中。
  • 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。/li>
  • 4) 各用户在线总时长统计,并将统计后的数据存入MySQL数据库中。

实现代码:

package com.bigdataplay2021

import org.apache.spark.sql._
import org.apache.spark.sql.functions.broadcast

object AnalysisJob {
  def main(args: Array[String]): Unit = {
    // 在windows下开发时设置
    System.setProperty("HADOOP_USER_NAME", "hduser")

    // SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("bigdata play")
      .getOrCreate()

    /* 从Hive表读取数据 */
    // 首先加载dws表
    val df = spark.table("olistdb.dws_ordersanditems")

    /* 注册为临时视图 */
    df.createOrReplaceTempView("orders")

    // 1)统计销量前5的商品并存入MySQL
    top5Items(spark)
    
    // 2) 某月的总销售额统计。例如,统计2017年12月份的总销售额
    SalesAmountByMonth(spark)

    // 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。
    // 例如,统计2017年10-12月份的总销售额
    SalesAmountByMonths(spark)
  }

  // 1) 销量前5的商品统计
  def top5Items(spark: SparkSession) = {
    // 分析:按商品id分组统计金额
    val top5 = spark.sql(
      """
        |select product_id, count(1) as cnt
        |from orders
        |group by product_id
        |order by cnt desc
        |limit 5
        |""".stripMargin)

    // 如果简单要求,则直接将结果写出到mysql
    // EtlUtil.exportToMysql(top5, "top5_tb")

    // 如果还要求这5个商品的信息,则需要和维表join
    import spark.implicits._
    // 找出这5个商品的id
    val top5_product_ids = top5.select("product_id").collect().map(_(0).toString)
    // 执行维表关联
    val top5_with_info = spark.
      // 加载维表
      table("olistdb.dim_products").
      // 先从维表中把这5个id对应的商品信息找出来
      where($"product_id".isin(top5_product_ids:_*)).
      // 执行broadcast join
      join(broadcast(top5), "product_id")

    // 写出到mysql
    EtlUtil.exportToMysql(top5_with_info, "top5_tb")

  }

  // 2) 某月的总销售额统计。例如,统计2017年12月份的总销售额
  def SalesAmountByMonth(spark: SparkSession) = {
    val amountOfMonth = spark.sql(
      """
        |select order_purchase_month, sum(amount) as total_amount
        |from orders
        |group by order_purchase_month
        |having order_purchase_month=="2017-12"
        |""".stripMargin)

    // 写出到mysql
    EtlUtil.exportToMysql(amountOfMonth, "amount_of_month")
  }

  // 3) 指定月份的销售额统计,并将统计后的数据存入MySQL数据库中。
  // 例如,统计2017年10-12月份的总销售额
  def SalesAmountByMonths(spark: SparkSession) = {
    val amountOfMonths = spark.sql(
      """
        |select sum(amount) as total_amount
        |from orders
        |where order_purchase_month in ("2017-10","2017-11","2017-12")
        |""".stripMargin)

    // 写出到mysql
    EtlUtil.exportToMysql(amountOfMonths, "amount_of_months")
  }

}

4) 项目打包。

将项目打包jar包以便部署。打包命令(在IDEA Terminal终端窗口中执行):

$ mvn clean package

5)项目部署

首先启动Hive Metastore服务,并保持运行(不要关闭):

$ hive --service metastore

将项目部署到Spark上运行,部署命令:

-- 先执行数据转换
$ spark-submit --class com.bigdataplay2021.TransformJob hellospark-1.0-SNAPSHOT.jar

-- 再执行数据统计分析和导出
$ spark-submit --class com.bigdataplay2021.AnalysisJob hellospark-1.0-SNAPSHOT.jar