发布日期:2022-04-12 VIP内容

赛题模拟实现-离线数据抽取

数据准备

这里使用的是Kaggle上的公共数据集,来自巴西olist商店电子商务订单。该数据集提供 2016 年至 2018 年在巴西多个市场下订单的 10 万份信息。其功能允许从多个维度查看订单:从订单状态、价格、付款和货运性能到客户位置、产品属性以及客户撰写的最后评论。 数据说明。

我们选取了其中4张表,E-R模型图如下:

对这4张表分别说明如下:

  • orders:订单表。每个订单包含多个商品项。
  • order_items:订单商品项表。
  • customers:客户信息表。
  • products:商品信息表。

因为数据集本身是文件性质,而竞赛任务中要求是从MySQL数据源抽取数据,因此我们准备一个MySQL数据源:将Olist上述四张表导入到MySQL中。请按以下说明导入:

1)先在MySQL中创建数据库olist:

mysql> create database olist;

2)然后执行以下命令导入数据库(在终端窗口执行):

$ mysql -uroot -p olist < olist.sql

3)测试导入是否成功:

mysql> use olist;
mysql> select * from products limit 5;

实现任务二:离线数据抽取

具体内容:

按照要求使用Scala语言完成特定函数的编写,使用Spark抽取MySQL指定数据表中的新增的数据到ODS层的指定的分区表中。

  • (1) 使用Spark抽取MySQL指定数据表中的新增的商品数据到ODS层的指定的分区表中;
  • (2) 使用Spark抽取MySQL指定数据表中的新增的用户数据到ODS层的指定的分区表中;
  • (3) 使用Spark抽取MySQL指定数据表中的新增的订单数据到ODS层的指定的分区表中;

实现原理:

什么是ODS层?这是一个数据仓库的概念,指的是数据运营层ODS(Operation Data Store),是数据准备区,也称为贴源层。

Spark ETL过程:

实现过程:

1)在IDEA中创建一个Maven项目。 可参考小白学苑的教程:使用IntelliJ IDEA开发Spark Maven应用程序

2)新建一个EtlUtil工具类,封装从MySQL中抽取数据的方法函数以及将数据加载到Hive的方法函数。实现代码如下:

package com.bigdataplay2021

import org.apache.spark.sql._
import java.util.Properties

// 定义工具类
object EtlUtil {

  // extract from mysql
  /**
   * @param spark      SparkSession实例
   * @param jdbcMap    要加载的JDBC配置项
   *
   * @return           返回一个DataFrame(Dataset[Row])
   *
   */
  def extractFromJDBC(spark: SparkSession, jdbcMap:Map[String,String]): Dataset[Row] = {
    // 读取JDBC数据源,创建DataFrame
    val df = spark
      .read
      .format("jdbc")
      .options(jdbcMap)
      .load()

    // 返回
    df
  }

  // load to hive
  /**
   * @param spark
   * @param df       要装载到Hive中的DataFrame
   * @param db       要装载到的Hive数据库
   * @param tb       要装载到的Hive ODS表
   * @param partitionColumn 指定分区列
   *
   * @return unit
   */
  def loadToHive(spark: SparkSession,
                 df:Dataset[Row],
                 db:String,
                 tb:String,
                 partitionColumn: Option[String] = None) = {
    spark.sql(s"use $db")          // 打开指定数据库,这里使用了字符串插值

    // 有的表需要分区,有的不需要。这里使用模式匹配来分别处理
    partitionColumn match{
      case Some(column) =>
        df.write
          .format("parquet")
          .mode(SaveMode.Overwrite)      // 覆盖
          .partitionBy(column)           // 指定分区
          .saveAsTable(tb)
      case None =>
        df.write
          .format("parquet")
          .mode(SaveMode.Overwrite)      // 覆盖
          .saveAsTable(tb)               // saveAsTable()方法:会将DataFrame数据保存到Hive表中
    }

  }

  // 定义一个ELT方法,包含extract + load
  def eltFromJDBCToHive(spark: SparkSession,
                        jdbcMap:Map[String,String],
                        hiveMap: Map[String,String]) = {
    // extract from jdbc
    val df = extractFromJDBC(spark, jdbcMap)

    // load to hive
    val database = hiveMap("db")       // 数据库
    val table = hiveMap("tb")          // 数据表
    loadToHive(spark, df, database, table, None)
  }


  // 定义一个函数,用来将分析结果导出到mysql中
  /**
   *
   * @param db    目标数据表
   * @param df    分析结果集
   */
  def exportToMysql(df:Dataset[Row], tb:String) = {
    // 数据库连接url。注意:这里写出的是mysql的olist数据库
    val DB_URL= "jdbc:mysql://localhost:3306/olist?useSSL=false"

    // 下面创建一个prop 变量用来保存JDBC 连接参数
    val props = new Properties()
    props.put("user", "root")        					// 表示用户名是root
    props.put("password", "admin")   				    // 表示密码是hadoop
    props.put("driver","com.mysql.jdbc.Driver") 		// 表示驱动程序

    df.write.mode(SaveMode.Append).jdbc(DB_URL, tb, props)
  }
}

3)新建一个执行ETL抽取任务的作业类,代码如下:

package com.bigdataplay2021

import org.apache.spark.sql.SparkSession

object EtlJob {
  def main(args: Array[String]): Unit = {
    // 在windows下开发时设置
    System.setProperty("HADOOP_USER_NAME", "hduser")

    // 创建SparkSession
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("bigdata play")
      .enableHiveSupport()
      .getOrCreate()

    // 打开Hive动态分区的标志
    spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

    // 1)抽取客户数据到ODS层的指定的维表中
    etlCustomersDim(spark)

    // 2)抽取商品数据到ODS层的指定的维表中
    etlProductsDim(spark)

    // 3)抽取订单数据到ODS层的指定的分区表中
    etlOrdersODS(spark)

    // 4)抽取订单项数据到ODS层的指定的分区表中
    etlOrderItemsODS(spark)
  }

  // 1)抽取客户数据到ODS层的指定的维表中
  def etlCustomersDim(spark: SparkSession) = {
    // 数据库连接url
    val DB_URL= "jdbc:mysql://localhost:3306/olist?useSSL=false"
    val jdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "customers",         // 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "admin"           	// mysql密码
    )

    // 定义ODS配置
    val hiveOptions = Map("db" -> "olistdb", "tb" -> "dim_customers")

    // ELT
    EtlUtil.eltFromJDBCToHive(spark, jdbcMap, hiveOptions)

    // 测试
    spark.table("olistdb.dim_customers").show(5)
  }

  // 2)抽取商品数据到ODS层的指定的维表中
  def etlProductsDim(spark: SparkSession) = {
    // 数据库连接url
    val DB_URL= "jdbc:mysql://localhost:3306/olist?useSSL=false"
    val jdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "products",          // 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "admin"           	// mysql密码
    )

    // 定义ODS配置
    val hiveOptions = Map("db" -> "olistdb", "tb" -> "dim_products")

    // ELT
    EtlUtil.eltFromJDBCToHive(spark, jdbcMap, hiveOptions)

    // 测试
    spark.table("olistdb.dim_products").show(5)
  }

  // 3)抽取订单数据到ODS层的指定的分区表中 - 事实表,分区存储?
  def etlOrdersODS(spark: SparkSession) = {
    val DB_URL= "jdbc:mysql://localhost:3306/olist?useSSL=false"       // 数据库连接url

    val jdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "orders",              // 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "admin"           	// mysql密码
    )

    // 定义ODS配置
    val hiveOptions = Map("db" -> "olistdb", "tb" -> "ods_orders")
    // 如果分区的话
    // val hiveOptions = Map("db" -> "olistdb", "tb" -> "ods_orders", "partitionColumn" -> "order_estimated_delivery_date")

    // ELT
    EtlUtil.eltFromJDBCToHive(spark, jdbcMap, hiveOptions)

    // 测试
    spark.table("olistdb.ods_orders").show(5)
  }

  // 4)抽取订单项数据到ODS层的指定的分区表中 - 事实表,分区存储?
  def etlOrderItemsODS(spark: SparkSession) = {
    val DB_URL= "jdbc:mysql://localhost:3306/olist?useSSL=false"       // 数据库连接url

    val jdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "order_items",         // 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "admin"           	// mysql密码
    )

    // 定义ODS配置
    val hiveOptions = Map("db" -> "olistdb", "tb" -> "ods_order_items")
    // 如果分区存储的话
    // val hiveOptions = Map("db" -> "olistdb", "tb" -> "ods_order_items", "partitionColumn" -> "order_item_id")

    // ELT
    EtlUtil.eltFromJDBCToHive(spark, jdbcMap, hiveOptions)

    // 测试
    spark.table("olistdb.ods_order_items").show(5)
  }

}

4)4)项目打包。

将项目打包jar包以便部署。打包命令(在IDEA Terminal终端窗口中执行):

$ mvn clean package

如下图所示:

5)5)项目部署。

首先启动Hive Metastore服务,并保持运行(不要关闭):

$ hive --service metastore

将项目部署到Spark上运行,部署命令:

$ spark-submit --class com.bigdataplay2021.EtlJob hellospark-1.0-SNAPSHOT.jar

注:以上代码和操作中,涉及到操作的目录路径、包名、类名等,可自行修改保持正确即可。