案例_Spark SQL实现数据ETL到Hive ODS
Spark SQL还支持读取和写入存储在Apache Hive中的数据。Spark支持两种SQL方言:Spark的SQL方言和Hive查询语言(HQL)。Spark SQL支持HiveQL语法,同时支持Hive SerDes和UDF,可以访问现有的Hive仓库。
同时Spark SQL支持各种数据源,如文件数据源、JDBC数据源等。所以我们可以结合Spark SQL和Hive,很容易地实现数据的ETL操作。接下来,我们将通过示例演示如何通过Spark SQL实现ETL任务,将数据分别从文件或MySQL中抽取到Hive数据仓库的ODS层叠 (数据贴源层)中。
在本案例中,我们将学习到如下内容:
- Spark SQL读写Hive的API;
- 实现从数据文件到Hive ODS的ETL任务;
- 实现从MySQL到Hive ODS的ETL任务;
- Spark整合Hive数据仓库的环境配置;
- IDEA中Spark读写Hive数仓的开发配置。
Spark SQL读写Hive的API介绍
读取Hive表数据
可以使用SparkSession或DataFrameReader的table()方法从一个Hive metastore的注册表中加载一个DataFrame。
例如,我们使用table方法加载Hive表test中的数据:
val df = spark.read.table("test")
df.show
或者:
val df = spark.table("wc")
df.show
写数据到Hive表
将DataFrame/Dataset保存到Hive表中,使用DataFrameWriter的saveAsTable()方法,它会将DataFrame数据保存到Hive表中,并在Hive metastore中注册。
例如:
df.write.saveAsTable("test")
使用saveAsTable()方法时,如果Hive表不存在,这会自动创建该Hive表。
默认情况下,如果不指定自定义表路径的话,Spark将把数据写到warehouse目录下的默认表路径。当删除表时,默认的表路径也将被删除。
实现从数据文件到Hive ODS的ETL任务
在我们的PBLP平台下,有一个北京地铁刷卡数据集metro.dat,包含北京地铁某一天用户刷卡数据。该数据位于PBLP平台的如下位置:~/data/spark/jt/。其中部分内容如下所示:
现在我们需要将它抽取到Hive的ODS层(数据贴源层),以备后续的数据处理和分析。下面我们编写这个ETL任务实现代码。
1)编写一个从文件数据源抽取数据到DataFrame中的方法extractFromFile。代码如下:
// extract from file
/**
* @param spark SparkSession实例
* @param filePath 要加载的文件路径
* @param schema 指定的模式
*
* @return 返回一个DataFrame(Dataset[Row])
*
*/
def extractFromFile(spark: SparkSession,
filePath: String,
schema: StructType): Dataset[Row] = {
// 读取文件数据源,创建DataFrame
val df = spark.read
.option("header",false)
.option("inferSchema",false)
.option("sep","\t")
.schema(schema)
.csv(filePath)
// 返回
df
}
2)编写一个将DataFrame中装载到Hive数仓的方法loadToHive。代码如下:
// load to hive
/**
* @param spark
* @param df 要装载到Hive中的DataFrame
* @param db 要装载到的Hive数据库
* @param tb 要装载到的Hive ODS表
* @param partitionColumn 指定分区列
*
* @return unit
*/
def loadToHive(spark: SparkSession,
df:Dataset[Row],
db:String,
tb:String,
partitionColumn: Option[String] = None) = {
spark.sql(s"use $db") // 打开指定数据库,这里使用了字符串插值
// 有的表需要分区,有的不需要。这里使用模式匹配来分别处理
partitionColumn match{
case Some(column) =>
df.write
.format("parquet")
.mode(SaveMode.Overwrite) // 覆盖
.partitionBy(column) // 指定分区
.saveAsTable(tb)
case None =>
df.write
.format("parquet")
.mode(SaveMode.Overwrite) // 覆盖
.saveAsTable(tb) // saveAsTable()方法:会将DataFrame数据保存到Hive表中
}
}
3)将以上两个方法封装到一个ETL方法中,方法取名为eltFromFileToHive,代码如下:
// 定义一个ELT方法,包含 extract from file + load to Hive
def eltFromFileToHive(spark: SparkSession,
fileMap: Map[String,String],
schema:StructType, hiveMap: Map[String,String]) = {
// extract from file
val filePath = fileMap("filePath") // 提取传入的文件路径
val df = extractFromFile(spark, filePath, schema) // 调用抽取文件数据源的方法,返回一个 DataFrame
// load to hive
val database = hiveMap("db") // 数据库
val table = hiveMap("tb") // 数据表
val partitionColumn = hiveMap.get("partitionColumn") // 分区列,注意这里是Option类型
loadToHive(spark, df, database, table, partitionColumn) // 调用装载数据仓库的方法
}
4)接下来,我们定义一个Driver程序,并指定好相应的参数,调用上面的eltFromFileToHive方法实现从文件数据源ETL数据到Hive ODS层。
def main(args: Array[String]): Unit = {
// 1) 创建SparkSession的实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("bus etl job")
.enableHiveSupport() // 需要启用对Hive的支持
.getOrCreate()
// 打开Hive动态分区的标志
spark.sqlContext.setConf("hive.exec.dynamic.partition","true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode","nonstrict")
// 2)执行ETL
busEtlTask(spark,args)
// 3)关闭SparkSession
spark.close()
}
// 对公交数据执行ETL的方法
def busEtlTask(spark:SparkSession,args: Array[String]): Unit = {
// 定义数据文件路径
val busFile = "~/data/spark/jt/metro.dat" // 公交刷卡数据集路径
// 定义文件选项配置
val busFileOptions = Map("filePath" -> busFile)
// 定义字段
val busFields = Seq(
StructField("card_id", StringType, true),
StructField("card_type", StringType, true),
StructField("trade_type", StringType, true),
StructField("mark_time", TimestampType, true),
StructField("mark_line_id", StringType, true),
StructField("mark_station", StringType, true),
StructField("mark_bus_id", StringType, true),
StructField("mark_tms_id", StringType, true),
StructField("trade_time", TimestampType, true),
StructField("trade_line_id", StringType, true),
StructField("trade_station", StringType, true),
StructField("trade_bus_id", StringType, true),
StructField("trade_tms_id", StringType, true),
StructField("trade_date", StringType, true)
)
// 定义schema
val busSchema = StructType(busFields)
// 定义Hive选项配置(db:数据库,tb:hive表,partitionColumn:分区列(Option类型))
val hiveOptions = Map("db" -> "jt", "tb" -> "s_ods_bus", "partitionColumn" -> "trade_date")
// 调用前面定义的ELT方法
eltFromFileToHive(spark, busFileOptions, busSchema, hiveOptions)
// 测试ETL是否成功
spark.table("jt.s_ods_bus").show(5)
}
执行以上程序,如果一切正常,则会输出如下内容:
+-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+ |card_id|card_type|trade_type| mark_time|mark_line_id|mark_station|mark_bus_id| mark_tms_id|trade_time|trade_line_id|trade_station|trade_bus_id|trade_tms_id|trade_date| +-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+ |C406545| 01| 06|2021-08-08 08:01:12| 52 | 东单路口东| TX379|202108081390| null| null| null| null| null|2021-08-08| |C488438| 01| 06|2021-08-08 08:05:55| 52 | 王府井| SX660|202108088333| null| null| null| null| null|2021-08-08| |C426146| 01| 06|2021-08-08 08:06:37| 52 | 马官营西| BE530|202108088328| null| null| null| null| null|2021-08-08| |C788075| 01| 06|2021-08-08 08:35:48| 72 | 木樨园桥北 | QH494|202108084496| null| null| null| null| null|2021-08-08| |C341050| 01| 06|2021-08-08 08:44:40| 1 | 东翠路口| BF613|202108088094| null| null| null| null| null|2021-08-08| +-------+---------+----------+-------------------+------------+------------+-----------+------------+----------+-------------+-------------+------------+------------+----------+ only showing top 5 rows
实现从MySQL到Hive ODS的ETL任务
在我们的PBLP平台的MySQL数据库中,已经准备有一个北京地铁站点信息数据表jt.metro_station_tb,包含北京各条地铁线路和站点信息以及经纬度。其中部分内容如下所示:
现在我们需要将它抽取到Hive的ODS层(数据贴源层),以备后续的数据处理和分析。下面我们编写这个ETL任务实现代码。
1)编写一个从MySQL数据源抽取数据到DataFrame中的方法extractFromJDBC。代码如下:
// extract from mysql
/**
* @param spark SparkSession实例
* @param jdbcMap 要加载的JDBC配置项
*
* @return 返回一个DataFrame(Dataset[Row])
*
*/
def extractFromJDBC(spark: SparkSession, jdbcMap:Map[String,String]): Dataset[Row] = {
// 读取JDBC数据源,创建DataFrame
val df = spark
.read
.format("jdbc")
.options(jdbcMap)
.load()
// 返回
df
}
2)编写一个将DataFrame中装载到Hive数仓的方法loadToHIve。这里我们重用前面定义的loadToHive方法。
3)将以上两个方法封装到一个ETL方法中,方法取名为eltFromJDBCToHive,代码如下:
// 定义一个ELT方法,包含extract from mysql + load to hive
def eltFromJDBCToHive(spark: SparkSession,
jdbcMap:Map[String,String],
hiveMap: Map[String,String]) = {
// extract from jdbc
val df = extractFromJDBC(spark, jdbcMap)
// load to hive
val database = hiveMap("db") // 数据库
val table = hiveMap("tb") // 数据表
loadToHive(spark, df, database, table, None)
}
4)接下来,我们定义一个Driver程序,并指定好相应的参数,调用上面的eltFromJDBCToHive方法实现从文件数据源ETL数据到Hive ODS层。
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.master("local[*]")
.appName("jt OD")
.enableHiveSupport()
.getOrCreate()
// 对地铁站点数据执行ELT
busSiteELTask(spark)
// 关闭SparkSession
spark.close()
}
// 地铁站点数据ELT方法
def busSiteELTask(spark: SparkSession): Unit = {
// 1. 定义文件数据源配置
val DB_URL= "jdbc:mysql://localhost:3306/jt?useSSL=false" // 数据库连接url
val busJdbcMap = Map(
"url" -> DB_URL, // jdbc url
"dbtable" -> "bus_station_tb", // 要读取的数据表
"user" -> "root", // mysql账号
"password" -> "admin" // mysql密码
)
// 2. 定义Hive配置
val hiveOptions = Map("db" -> "jt", "tb" -> "bus_station_tb")
// 3. 执行ELT
EtlUtil.eltFromJDBCToHive(spark, busJdbcMap, hiveOptions)
// 测试ETL是否成功
spark.table("jt.bus_station_tb").show(5)
}
执行以上程序,如果一切正常,则会输出如下内容:
+-------+---------+-------------+----------+------------+-----------------+----------------+ |line_no|line_name|station_order|station_id|station_name|station_longitude|station_latitude| +-------+---------+-------------+----------+------------+-----------------+----------------+ | 1| 1路| 1| 11| 四惠枢纽站 | 116.5027841| 39.91309207| | 1| 1路| 2| 12| 八王坟西 | 116.4133837| 39.91092455| | 1| 1路| 3| 13| 郎家园| 116.4729916| 39.91221366| | 1| 1路| 4| 14| 大北窑东| 116.4133837| 39.91092455| | 1| 1路| 5| 15| 大北窑西 | 116.4133837| 39.91092455| +-------+---------+-------------+----------+------------+-----------------+----------------+ only showing top 5 rows
Spark整合Hive数据仓库的环境配置
要通过Spark SQL访问Hive数据表,需要先进行以下配置。
1)配置Hive支持
要使Spark SQL ETL作业能在集群上成功运行(读写Hive表),那么需要在Spark集群启动之前,将Hive的配置文件hive-site.xml、HDFS的配置core-site.xml (用于安全配置)和hdfs-site.xml (用于HDFS 配置) 放到Spark的conf/目录下。
说明:如果没有配置hive-site.xml,上下文会自动在当前目录中创建metastore_db,并创建一个由spark.sql.warehouse.dir配置的目录,它默认为启动Spark应用程序的当前目录下的spark-warehouse目录。
2)拷贝JDBC驱动
因为Spark SQL需要访问hive metastore,所以需要将JDBC驱动配置在driver以及所有executors的classpath中。最简单的方式是在提交应用程序或启动Spark shell时,使用--jars选项指定要提供的JAR文件。或者,将MySQL的JDBC驱动jar包拷贝到$SPARK_HOME/jars/目录下。
3)启动Hive Metastore Server
Hive Metastore Server是Spark SQL应用程序将要连接到的服务器,用于获取Hive表的元数据。启动Hive Metastore Server的命令如下:
$ $HIVE_HOME/bin/hive --service metastore
IDEA中Spark读写Hive数仓的开发配置
当我们在IntellIJ IDEA中开发Spark ETL程序时,需要配置IDEA中Spark程序能访问到Hive。这需要对IDEA中的项目做如下配置。
1)配置Hive支持
需要将Hive的配置文件hive-site.xml、HDFS的配置core-site.xml (用于安全配置)和hdfs-site.xml (用于HDFS 配置) 放到项目的resources/目录下。
2)增加项目依赖
需要在pom.xml文件中添加hive依赖和jdbc驱动依赖。如下所示:
<!-- hive -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
<!-- mysql jdbc -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
说明:本案例在《Spark实用教程》一书中有更为详细的解释以及部署说明,可参考。