发布日期:2022-03-16 VIP内容

案例_Spark SQL实现数据ETL到Hive ODS

Spark SQL还支持读取和写入存储在Apache Hive中的数据。Spark支持两种SQL方言:Spark的SQL方言和Hive查询语言(HQL)。Spark SQL支持HiveQL语法,同时支持Hive SerDes和UDF,可以访问现有的Hive仓库。

同时Spark SQL支持各种数据源,如文件数据源、JDBC数据源等。所以我们可以结合Spark SQL和Hive,很容易地实现数据的ETL操作。接下来,我们将通过示例演示如何通过Spark SQL实现ETL任务,将数据分别从文件或MySQL中抽取到Hive数据仓库的ODS层叠 (数据贴源层)中。

在本案例中,我们将学习到如下内容:

  • Spark SQL读写Hive的API;
  • 实现从数据文件到Hive ODS的ETL任务;
  • 实现从MySQL到Hive ODS的ETL任务;
  • Spark整合Hive数据仓库的环境配置;
  • IDEA中Spark读写Hive数仓的开发配置。

Spark SQL读写Hive的API介绍

读取Hive表数据

可以使用SparkSession或DataFrameReader的table()方法从一个Hive metastore的注册表中加载一个DataFrame。

例如,我们使用table方法加载Hive表test中的数据:

val df = spark.read.table("test")
df.show

或者:

val df = spark.table("wc")
df.show

写数据到Hive表

将DataFrame/Dataset保存到Hive表中,使用DataFrameWriter的saveAsTable()方法,它会将DataFrame数据保存到Hive表中,并在Hive metastore中注册。

例如:

df.write.saveAsTable("test")

使用saveAsTable()方法时,如果Hive表不存在,这会自动创建该Hive表。

默认情况下,如果不指定自定义表路径的话,Spark将把数据写到warehouse目录下的默认表路径。当删除表时,默认的表路径也将被删除。

实现从数据文件到Hive ODS的ETL任务

在我们的PBLP平台下,有一个北京地铁刷卡数据集metro.dat,包含北京地铁某一天用户刷卡数据。该数据位于PBLP平台的如下位置:~/data/spark/jt/。其中部分内容如下所示:

现在我们需要将它抽取到Hive的ODS层(数据贴源层),以备后续的数据处理和分析。下面我们编写这个ETL任务实现代码。

1)编写一个从文件数据源抽取数据到DataFrame中的方法extractFromFile。代码如下:

  // extract from file
  /**
   * @param spark      SparkSession实例
   * @param filePath   要加载的文件路径
   * @param schema     指定的模式
   *
   * @return           返回一个DataFrame(Dataset[Row])
   *
   */
  def extractFromFile(spark: SparkSession,
                      filePath: String,
                      schema: StructType): Dataset[Row] = {
    // 读取文件数据源,创建DataFrame
    val df = spark.read
      .option("header",false)
      .option("inferSchema",false)
      .option("sep","\t")
      .schema(schema)
      .csv(filePath)

    // 返回
    df
  }

2)编写一个将DataFrame中装载到Hive数仓的方法loadToHive。代码如下:

  // load to hive
  /**
   * @param spark
   * @param df       要装载到Hive中的DataFrame
   * @param db       要装载到的Hive数据库
   * @param tb       要装载到的Hive ODS表
   * @param partitionColumn 指定分区列
   *
   * @return unit
   */
  def loadToHive(spark: SparkSession,
                 df:Dataset[Row],
                 db:String,
                 tb:String,
                 partitionColumn: Option[String] = None) = {
    spark.sql(s"use $db")          // 打开指定数据库,这里使用了字符串插值

    // 有的表需要分区,有的不需要。这里使用模式匹配来分别处理
    partitionColumn match{
      case Some(column) =>
        df.write
          .format("parquet")
          .mode(SaveMode.Overwrite)      // 覆盖
          .partitionBy(column)           // 指定分区
          .saveAsTable(tb)
      case None =>
        df.write
          .format("parquet")
          .mode(SaveMode.Overwrite)      // 覆盖
          .saveAsTable(tb)               // saveAsTable()方法:会将DataFrame数据保存到Hive表中
    }

  }

3)将以上两个方法封装到一个ETL方法中,方法取名为eltFromFileToHive,代码如下:

  // 定义一个ELT方法,包含 extract from file + load to Hive
  def eltFromFileToHive(spark: SparkSession,
                        fileMap: Map[String,String],
                        schema:StructType, hiveMap: Map[String,String]) = {
    // extract from file
    val filePath = fileMap("filePath")    // 提取传入的文件路径
    val df = extractFromFile(spark, filePath, schema)   // 调用抽取文件数据源的方法,返回一个 DataFrame

    // load to hive
    val database = hiveMap("db")      // 数据库
    val table = hiveMap("tb")         // 数据表
    val partitionColumn = hiveMap.get("partitionColumn")  // 分区列,注意这里是Option类型
    loadToHive(spark, df, database, table, partitionColumn)  // 调用装载数据仓库的方法
  }

4)接下来,我们定义一个Driver程序,并指定好相应的参数,调用上面的eltFromFileToHive方法实现从文件数据源ETL数据到Hive ODS层。

  def main(args: Array[String]): Unit = {
    // 1) 创建SparkSession的实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("bus etl job")
      .enableHiveSupport()        // 需要启用对Hive的支持
      .getOrCreate()

    // 打开Hive动态分区的标志
    spark.sqlContext.setConf("hive.exec.dynamic.partition","true")
    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")

    // 2)执行ETL
    busEtlTask(spark,args)

    // 3)关闭SparkSession
    spark.close()
  }

  // 对公交数据执行ETL的方法
  def busEtlTask(spark:SparkSession,args: Array[String]): Unit = {
    // 定义数据文件路径
    val busFile = "~/data/spark/jt/metro.dat"     // 公交刷卡数据集路径

    // 定义文件选项配置
    val busFileOptions = Map("filePath" -> busFile)

    // 定义字段
    val busFields = Seq(
      StructField("card_id", StringType, true),
      StructField("card_type", StringType, true),
      StructField("trade_type", StringType, true),

      StructField("mark_time", TimestampType, true),
      StructField("mark_line_id", StringType, true),
      StructField("mark_station", StringType, true),
      StructField("mark_bus_id", StringType, true),
      StructField("mark_tms_id", StringType, true),

      StructField("trade_time", TimestampType, true),
      StructField("trade_line_id", StringType, true),
      StructField("trade_station", StringType, true),
      StructField("trade_bus_id", StringType, true),
      StructField("trade_tms_id", StringType, true),

      StructField("trade_date", StringType, true)
    )

    // 定义schema
    val busSchema = StructType(busFields)

    // 定义Hive选项配置(db:数据库,tb:hive表,partitionColumn:分区列(Option类型))
    val hiveOptions = Map("db" -> "jt", "tb" -> "s_ods_bus", "partitionColumn" -> "trade_date")

    // 调用前面定义的ELT方法
    eltFromFileToHive(spark, busFileOptions, busSchema, hiveOptions)

    // 测试ETL是否成功
    spark.table("jt.s_ods_bus").show(5)
  }

执行以上程序,如果一切正常,则会输出如下内容:

+-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+
|card_id|card_type|trade_type|          mark_time|mark_line_id|mark_station|mark_bus_id| mark_tms_id|trade_time|trade_line_id|trade_station|trade_bus_id|trade_tms_id|trade_date|
+-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+
|C406545|       01|        06|2021-08-08 08:01:12|         52 |  东单路口东|      TX379|202108081390|      null|         null|         null|        null|        null|2021-08-08|
|C488438|       01|        06|2021-08-08 08:05:55|         52 |      王府井|      SX660|202108088333|      null|         null|         null|        null|        null|2021-08-08|
|C426146|       01|        06|2021-08-08 08:06:37|         52 |    马官营西|      BE530|202108088328|      null|         null|         null|        null|        null|2021-08-08|
|C788075|       01|        06|2021-08-08 08:35:48|         72 | 木樨园桥北 |      QH494|202108084496|      null|         null|         null|        null|        null|2021-08-08|
|C341050|       01|        06|2021-08-08 08:44:40|          1 |    东翠路口|      BF613|202108088094|      null|         null|         null|        null|        null|2021-08-08|
+-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+
only showing top 5 rows

实现从MySQL到Hive ODS的ETL任务

在我们的PBLP平台的MySQL数据库中,已经准备有一个北京地铁站点信息数据表jt.metro_station_tb,包含北京各条地铁线路和站点信息以及经纬度。其中部分内容如下所示:

现在我们需要将它抽取到Hive的ODS层(数据贴源层),以备后续的数据处理和分析。下面我们编写这个ETL任务实现代码。

1)编写一个从MySQL数据源抽取数据到DataFrame中的方法extractFromJDBC。代码如下:

  // extract from mysql
  /**
   * @param spark      SparkSession实例
   * @param jdbcMap    要加载的JDBC配置项
   *
   * @return           返回一个DataFrame(Dataset[Row])
   *
   */
  def extractFromJDBC(spark: SparkSession, jdbcMap:Map[String,String]): Dataset[Row] = {
    // 读取JDBC数据源,创建DataFrame
    val df = spark
      .read
      .format("jdbc")
      .options(jdbcMap)
      .load()

    // 返回
    df
  }

2)编写一个将DataFrame中装载到Hive数仓的方法loadToHIve。这里我们重用前面定义的loadToHive方法。

3)将以上两个方法封装到一个ETL方法中,方法取名为eltFromJDBCToHive,代码如下:

  // 定义一个ELT方法,包含extract from mysql + load to hive
  def eltFromJDBCToHive(spark: SparkSession, 
                        jdbcMap:Map[String,String], 
                        hiveMap: Map[String,String]) = {
    // extract from jdbc
    val df = extractFromJDBC(spark, jdbcMap)

    // load to hive
    val database = hiveMap("db")       // 数据库
    val table = hiveMap("tb")          // 数据表
    loadToHive(spark, df, database, table, None)
  }

4)接下来,我们定义一个Driver程序,并指定好相应的参数,调用上面的eltFromJDBCToHive方法实现从文件数据源ETL数据到Hive ODS层。

  def main(args: Array[String]): Unit = {

    // 创建SparkSession
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("jt OD")
      .enableHiveSupport()
      .getOrCreate()

    // 对地铁站点数据执行ELT
    busSiteELTask(spark)

    // 关闭SparkSession
    spark.close()
  }

  // 地铁站点数据ELT方法
  def busSiteELTask(spark: SparkSession): Unit = {
    // 1. 定义文件数据源配置
    val DB_URL= "jdbc:mysql://localhost:3306/jt?useSSL=false"       // 数据库连接url
    val busJdbcMap = Map(
      "url" -> DB_URL,                	// jdbc url
      "dbtable" -> "bus_station_tb",    // 要读取的数据表
      "user" -> "root",               	// mysql账号
      "password" -> "admin"           	// mysql密码
    )

    // 2. 定义Hive配置
    val hiveOptions = Map("db" -> "jt", "tb" -> "bus_station_tb")

    // 3. 执行ELT
    EtlUtil.eltFromJDBCToHive(spark, busJdbcMap, hiveOptions)

    // 测试ETL是否成功
    spark.table("jt.bus_station_tb").show(5)
  }

执行以上程序,如果一切正常,则会输出如下内容:

+-------+---------+-------------+----------+------------+-----------------+----------------+
|line_no|line_name|station_order|station_id|station_name|station_longitude|station_latitude|
+-------+---------+-------------+----------+------------+-----------------+----------------+
|      1|      1路|            1|        11| 四惠枢纽站 |      116.5027841|     39.91309207|
|      1|      1路|            2|        12|   八王坟西 |      116.4133837|     39.91092455|
|      1|      1路|            3|        13|      郎家园|      116.4729916|     39.91221366|
|      1|      1路|            4|        14|    大北窑东|      116.4133837|     39.91092455|
|      1|      1路|            5|        15|   大北窑西 |      116.4133837|     39.91092455|
+-------+---------+-------------+----------+------------+-----------------+----------------+
only showing top 5 rows

Spark整合Hive数据仓库的环境配置

要通过Spark SQL访问Hive数据表,需要先进行以下配置。

1)配置Hive支持

要使Spark SQL ETL作业能在集群上成功运行(读写Hive表),那么需要在Spark集群启动之前,将Hive的配置文件hive-site.xml、HDFS的配置core-site.xml (用于安全配置)和hdfs-site.xml (用于HDFS 配置) 放到Spark的conf/目录下。

说明:如果没有配置hive-site.xml,上下文会自动在当前目录中创建metastore_db,并创建一个由spark.sql.warehouse.dir配置的目录,它默认为启动Spark应用程序的当前目录下的spark-warehouse目录。

2)拷贝JDBC驱动

因为Spark SQL需要访问hive metastore,所以需要将JDBC驱动配置在driver以及所有executors的classpath中。最简单的方式是在提交应用程序或启动Spark shell时,使用--jars选项指定要提供的JAR文件。或者,将MySQL的JDBC驱动jar包拷贝到$SPARK_HOME/jars/目录下。

3)启动Hive Metastore Server

Hive Metastore Server是Spark SQL应用程序将要连接到的服务器,用于获取Hive表的元数据。启动Hive Metastore Server的命令如下:

$ $HIVE_HOME/bin/hive --service metastore

IDEA中Spark读写Hive数仓的开发配置

当我们在IntellIJ IDEA中开发Spark ETL程序时,需要配置IDEA中Spark程序能访问到Hive。这需要对IDEA中的项目做如下配置。

1)配置Hive支持

需要将Hive的配置文件hive-site.xml、HDFS的配置core-site.xml (用于安全配置)和hdfs-site.xml (用于HDFS 配置) 放到项目的resources/目录下。

2)增加项目依赖

需要在pom.xml文件中添加hive依赖和jdbc驱动依赖。如下所示:

        <!-- hive -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>

        <!-- mysql jdbc -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>
说明:本案例在《Spark实用教程》一书中有更为详细的解释以及部署说明,可参考。