Spark SQL编程案例1_电商数据分析

本节通过几个案例的学习,掌握使用Spark SQL进行大数据分析的复杂用法。

【示例】电商数据分析示例。使用nw数据集,回答以下问题:

  • 每个客户下了多少订单?
  • 每个国家的订单有多少?
  • 每月/年有多少订单?
  • 每个客户的年销售总额是多少?
  • 客户每年的平均订单是多少?

实现代码如下所示:

def main(args: Array[String]): Unit = {
    // 0) 创建SparkSession的实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Spark Basic Example")
      .getOrCreate()

    // 1、读取源数据文件
    val filePath = "src/main/resources/"

    // 加载订单数据集到DataFrame中
    val orders = spark.read
      .option("header","true")
      .option("inferSchema","true")
      .csv(filePath + "NW-Orders-01.csv")

    println("订单有" + orders.count() + "行")
    orders.printSchema()
    orders.show(3)

    // 加载订单明细数据集到DataFrame中
    val orderDetails = spark.read
      .option("header","true")
      .option("inferSchema","true")
      .csv(filePath + "NW-Order-Details.csv")

    println("订单明细有" + orderDetails.count() + "行")
    orderDetails.printSchema()
    orderDetails.show(3)

    // 2、每个客户下了多少订单?
    val orderByCustomer = orders.groupBy("CustomerID").count()
    orderByCustomer.sort(col("count").desc).show(3)

    // 3、每个国家的订单有多少?
    val orderByCountry = orders.groupBy("ShipCountry").count()
    orderByCountry.sort(col("count").desc).show(3)

    //# 对于后面三个问题,需要对数据进行一些转换
    //# 1. 向Orders DataFrame增加一个OrderTotal列
    //#    1.1. 计算每个订单明细的实际金额
    //#    1.2. 根据order id统计每张订单的总金额
    //#    1.3. 对order details & orders进行等值内连接,增加订单总金额
    //#    1.4. 检查是否有任何null列
    //# 2. 增加一个date列
    //# 3. 增加month和year

    //# 1.1. 向order details中增加每行的小计(每个订单明细的实际金额)
    import spark.implicits._
    val orderDetails1 = orderDetails
      .select($"OrderID",(($"UnitPrice" * $"Qty") - ($"UnitPrice" * $"Qty") * $"Discount").as("OrderPrice"))
    orderDetails1.show(5)

    //# 1.2. 根据order id统计每张订单的总金额
    // val orderTot = orderDetails1.groupBy("OrderID").sum("OrderPrice").alias("OrderTotal")
    val orderTot = orderDetails1.groupBy("OrderID").agg(sum("OrderPrice").as("OrderTotal"))
    // orderTot.sort("OrderID").show(5)
    // orderTot.sort($"OrderTotal".desc).show(5)
    orderTot.select($"OrderID",bround($"OrderTotal",2)).sort("OrderID").show(5)

    //# 1.3. 对order details & orders进行等值内连接,增加订单总金额
    val orders1 = orders
      .join(orderTot, orders("OrderID").equalTo(orderTot("OrderID")), "inner")
      .select(
        orders("OrderID"),
        orders("CustomerID"),
        orders("OrderDate"),
        orders("ShipCountry").alias("ShipCountry"),
        orderTot("OrderTotal").alias("Total")
      )
    orders1.sort("CustomerID").show()

    // # 1.4. 检查是否有任何null列
    orders1.filter(orders1("Total").isNull).show()

    // # 2. 增加一个date列
    val orders2 = orders1.withColumn("Date",to_date(orders1("OrderDate")))
    orders2.printSchema()
    orders2.show(2)

    // # 3. 增加month和year
    val orders3 = orders2.withColumn("Month",month($"OrderDate")).withColumn("Year",year($"OrderDate"))
    orders3.show(2)

    // Q 4. 每月/年有多少订单金额?
    val ordersByYM = orders3.groupBy("Year","Month").agg(sum("Total").as("Total"))
    ordersByYM.select($"Year",$"Month",bround($"Total",2) as "Total").sort($"Year",$"Month").show()

    // Q 5. 每个客户的年销售总额是多少?
    var ordersByCY = orders3.groupBy("CustomerID","Year").agg(sum("Total").as("Total"))
    ordersByCY.sort($"CustomerID",$"Year").show()

    // Q 6. 客户每年的平均订单是多少?
    ordersByCY = orders3.groupBy("CustomerID","Year").agg(avg("Total").as("Avg"))
//    ordersByCY.show(5)
    ordersByCY.select($"CustomerID",$"Year",bround($"Avg",2)).sort($"CustomerID",$"Year").show()

    // Q 7. 客户的平均订单金额是多少?
    val ordersCA = orders3.groupBy("CustomerID").agg(avg("Total").as("C-Avg"), sum("Total").as("C-Sum"))
    ordersCA.sort(col("C-Avg").desc).show()
  }

《PySpark原理深入与编程实战》