Spark SQL编程案例1_电商数据分析
本节通过几个案例的学习,掌握使用Spark SQL进行大数据分析的复杂用法。
【示例】电商数据分析示例。使用nw数据集,回答以下问题:
- 每个客户下了多少订单?
- 每个国家的订单有多少?
- 每月/年有多少订单?
- 每个客户的年销售总额是多少?
- 客户每年的平均订单是多少?
实现代码如下所示:
def main(args: Array[String]): Unit = { // 0) 创建SparkSession的实例 val spark = SparkSession.builder() .master("local[*]") .appName("Spark Basic Example") .getOrCreate() // 1、读取源数据文件 val filePath = "src/main/resources/" // 加载订单数据集到DataFrame中 val orders = spark.read .option("header","true") .option("inferSchema","true") .csv(filePath + "NW-Orders-01.csv") println("订单有" + orders.count() + "行") orders.printSchema() orders.show(3) // 加载订单明细数据集到DataFrame中 val orderDetails = spark.read .option("header","true") .option("inferSchema","true") .csv(filePath + "NW-Order-Details.csv") println("订单明细有" + orderDetails.count() + "行") orderDetails.printSchema() orderDetails.show(3) // 2、每个客户下了多少订单? val orderByCustomer = orders.groupBy("CustomerID").count() orderByCustomer.sort(col("count").desc).show(3) // 3、每个国家的订单有多少? val orderByCountry = orders.groupBy("ShipCountry").count() orderByCountry.sort(col("count").desc).show(3) //# 对于后面三个问题,需要对数据进行一些转换 //# 1. 向Orders DataFrame增加一个OrderTotal列 //# 1.1. 计算每个订单明细的实际金额 //# 1.2. 根据order id统计每张订单的总金额 //# 1.3. 对order details & orders进行等值内连接,增加订单总金额 //# 1.4. 检查是否有任何null列 //# 2. 增加一个date列 //# 3. 增加month和year //# 1.1. 向order details中增加每行的小计(每个订单明细的实际金额) import spark.implicits._ val orderDetails1 = orderDetails .select($"OrderID",(($"UnitPrice" * $"Qty") - ($"UnitPrice" * $"Qty") * $"Discount").as("OrderPrice")) orderDetails1.show(5) //# 1.2. 根据order id统计每张订单的总金额 // val orderTot = orderDetails1.groupBy("OrderID").sum("OrderPrice").alias("OrderTotal") val orderTot = orderDetails1.groupBy("OrderID").agg(sum("OrderPrice").as("OrderTotal")) // orderTot.sort("OrderID").show(5) // orderTot.sort($"OrderTotal".desc).show(5) orderTot.select($"OrderID",bround($"OrderTotal",2)).sort("OrderID").show(5) //# 1.3. 对order details & orders进行等值内连接,增加订单总金额 val orders1 = orders .join(orderTot, orders("OrderID").equalTo(orderTot("OrderID")), "inner") .select( orders("OrderID"), orders("CustomerID"), orders("OrderDate"), orders("ShipCountry").alias("ShipCountry"), orderTot("OrderTotal").alias("Total") ) orders1.sort("CustomerID").show() // # 1.4. 检查是否有任何null列 orders1.filter(orders1("Total").isNull).show() // # 2. 增加一个date列 val orders2 = orders1.withColumn("Date",to_date(orders1("OrderDate"))) orders2.printSchema() orders2.show(2) // # 3. 增加month和year val orders3 = orders2.withColumn("Month",month($"OrderDate")).withColumn("Year",year($"OrderDate")) orders3.show(2) // Q 4. 每月/年有多少订单金额? val ordersByYM = orders3.groupBy("Year","Month").agg(sum("Total").as("Total")) ordersByYM.select($"Year",$"Month",bround($"Total",2) as "Total").sort($"Year",$"Month").show() // Q 5. 每个客户的年销售总额是多少? var ordersByCY = orders3.groupBy("CustomerID","Year").agg(sum("Total").as("Total")) ordersByCY.sort($"CustomerID",$"Year").show() // Q 6. 客户每年的平均订单是多少? ordersByCY = orders3.groupBy("CustomerID","Year").agg(avg("Total").as("Avg")) // ordersByCY.show(5) ordersByCY.select($"CustomerID",$"Year",bround($"Avg",2)).sort($"CustomerID",$"Year").show() // Q 7. 客户的平均订单金额是多少? val ordersCA = orders3.groupBy("CustomerID").agg(avg("Total").as("C-Avg"), sum("Total").as("C-Sum")) ordersCA.sort(col("C-Avg").desc).show() }