发布日期:2022-07-30 VIP内容

Spark SQL实现分区表的分区覆盖

在Spark SQL中,如果想把一个DataFrame保存到Hive表中,有两个方法可以使用,分别是insertInto和saveAsTable()。

1、保存DataFrame数据到Hive表中

当使用saveAsTable()方法时:

  • 1)如果表不存在,则会创建表;
  • 2)如果表已经存在,则取决于保存模式(SaveMode):
    • (1) 如果不指定保存模式(默认),则会抛出异常;
    • (2) 如果指定append模式,则会追加df数据到表中,且df的schema与已存在的schema个数要相同(列顺序可以不同),否则出错;
    • (3) 如果指定overwrite模式,则会用df数据覆盖表中已有的数据。

当使用insertInto()方法时,表必须已经存在,且df的schema结构顺序与指定Hive表的schema结构顺序必须一致。其行为也取决于插入模式(SaveMode):

  • (1) 如果不指定保存模式,则默认是append,会追加df数据到表中;
  • (2) 如果指定overwrite模式,则会用df数据覆盖表中已有的数据。

2、对分区表的全表覆盖

在分区表中,当调用saveAsTable()方法和insertInto()方法并均指定SaveMode存储模式为orvewrite时,实现的是分区表的全表覆盖。

什么是“分区表的全表覆盖”呢?请看下面这个示例。

首先创建一个分区表,并写入初始的分区数据。代码如下:

// 构建SparkSession实例对象
val spark = SparkSession
      .builder()
      .appName("SparkHivePartitionOverwrite")
      .master("local")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .enableHiveSupport()
      .getOrCreate()  

// 创建一个简单的DataFrame
val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")

// 创建临时表
df.createOrReplaceTempView("temp_table")

// 定义分区表的表名
val tableName="test_partition"

// 创建分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)

// 读取分区表数据,并显示
spark.table(tableName).show()

执行以上代码,输出内容如下:

+---+----+---+----+
| id|name|age|year|
+---+----+---+----+
|002|李四| 18|2017|
|001|张三| 21|2018|
+---+----+---+----+

接下来,创建另一个名为df1的DataFrame,包含新的数据,代码如下:

val data1 = Array(("011", "Sam", 21, "2018"))
val df1 = spark.createDataFrame(data1).toDF("id", "name", "age", "year")

现在调用insertInto()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:

// 覆盖插入
df1.write.mode("overwrite").insertInto(tableName)

// 读取分区表中的数据
spark.table(tableName).show()  

执行以上代码,输出内容如下:

+---+----+---+----+
| id|name|age|year|
+---+----+---+----+
|011| Sam| 21|2018|
+---+----+---+----+

从输出结果可以看出,虽然我们新保存的数据只是year=2018分区的数据,但Spark会把原来的所有分区都删除,而不是仅覆盖year=2018分区,结果是分区表中只包含新插入的分区year=2018。

现在换一下,调用saveAsTable()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:

// 首先删除原来的分区表
spark.sql(s"drop table ${tableName}")  

// 重新创建初始分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)

// 覆盖保存
df1.write.mode("overwrite").partitionBy("year").saveAsTable(tableName) 

// 查询覆盖保存之后的分区表数据
spark.table(tableName).show()    

执行以上代码,输出内容如下:

+---+----+---+----+
| id|name|age|year|
+---+----+---+----+
|011| Sam| 21|2018|
+---+----+---+----+

从输出结果可以看出,虽然我们新保存的数据只是year=2018分区的数据,但Spark会把原来的所有分区都删除,而不是仅覆盖year=2018分区,结果是分区表中只包含新插入的分区year=2018。

小结:也就是说,当对分区表使用overwrite覆盖模式时,不管是调用insertInto()方法,还是调用saveAsTable()方法,都只能全表覆盖,而无法实现分区覆盖(即只覆盖year=2018这个分区)。

3、如何实现对分区表的分区覆盖?

有没有办法实现对分区表的分区覆盖呢?比如对于分区表test_partition,当df1覆盖写入时,只有year=2018这个分区被覆盖,而year=2017这个分区不受影响(得到保留)呢?

如果是在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除要被覆盖的分区(例如,year=2018),然后再使用append模式将新的df1写入这些分区。除此之外,没有更好的方法。

如果是在Spark 2.3.0及之后,因为Spark在2.3.0中引入了动态分区覆盖模式,可通过启用它来实现分区覆盖。具体实现方法如下。

首先,启用动态分区覆盖模式

val spark = SparkSession
      .builder()
      .appName("SparkHivePartitionOverwrite")
      .master("local")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .config("spark.sql.sources.partitionOverwriteMode","dynamic")  // 启用动态分区覆盖模式
      .enableHiveSupport()
      .getOrCreate()  

或者:

// 构建SparkSession实例对象
val spark = SparkSession
      .builder()
      .appName("SparkHivePartitionOverwrite")
      .master("local")
      .config("spark.sql.parquet.writeLegacyFormat", true)
      .enableHiveSupport()
      .getOrCreate()  

// 配置动态分区覆盖模式
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

然后,创建一个分区表。实现代码如下:

// 创建一个简单的DataFrame
val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")

// 创建临时表
df.createOrReplaceTempView("temp_table")

// 定义分区表的表名
val tableName="test_partition"

// 创建分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)

// 读取分区表数据,并显示
spark.table(tableName).show()

执行以上代码,输出内容如下:

+---+----+---+----+
| id|name|age|year|
+---+----+---+----+
|002|李四| 18|2017|
|001|张三| 21|2018|
+---+----+---+----+

接下来,创建另一个名为df1的DataFrame,包含新的数据,代码如下:

val data1 = Array(("011", "Sam", 21, "2018"))
val df1 = spark.createDataFrame(data1).toDF("id", "name", "age", "year")

现在调用insertInto()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:

// 覆盖插入
df1.write.mode("overwrite").insertInto(tableName)

// 读取分区表中的数据
spark.table(tableName).show()  

执行以上代码,输出内容如下:

+---+----+---+----+
| id|name|age|year|
+---+----+---+----+
|002|李四| 18|2017|
|011| Sam| 21|2018|
+---+----+---+----+

从输出结果可以看出,这时year=2018这个分区的值被覆盖,而year=2017这个分区的值被保留,从而实现了分区覆盖的目的。

结论:如果想实现分区覆盖,

  • 在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除要被覆盖的分区,然后再使用append模式写入这些分区。
  • 在Spark 2.3.0及之后:
    • (1) 设置spark.sql.sources.partitionOverwriteMode的值为dynamic,启用动态分区覆盖;
    • (2) 应用insertInto()方法 + overwrite保存模式,实现分区覆盖
  • 如果想用SaveAsTable只覆盖表中的某些分区,这是不可能的。