Spark SQL实现分区表的分区覆盖
在Spark SQL中,如果想把一个DataFrame保存到Hive表中,有两个方法可以使用,分别是insertInto和saveAsTable()。
1、保存DataFrame数据到Hive表中
当使用saveAsTable()方法时:
- 1)如果表不存在,则会创建表;
- 2)如果表已经存在,则取决于保存模式(SaveMode):
- (1) 如果不指定保存模式(默认),则会抛出异常;
- (2) 如果指定append模式,则会追加df数据到表中,且df的schema与已存在的schema个数要相同(列顺序可以不同),否则出错;
- (3) 如果指定overwrite模式,则会用df数据覆盖表中已有的数据。
当使用insertInto()方法时,表必须已经存在,且df的schema结构顺序与指定Hive表的schema结构顺序必须一致。其行为也取决于插入模式(SaveMode):
- (1) 如果不指定保存模式,则默认是append,会追加df数据到表中;
- (2) 如果指定overwrite模式,则会用df数据覆盖表中已有的数据。
2、对分区表的全表覆盖
在分区表中,当调用saveAsTable()方法和insertInto()方法并均指定SaveMode存储模式为orvewrite时,实现的是分区表的全表覆盖。
什么是“分区表的全表覆盖”呢?请看下面这个示例。
首先创建一个分区表,并写入初始的分区数据。代码如下:
// 构建SparkSession实例对象
val spark = SparkSession
.builder()
.appName("SparkHivePartitionOverwrite")
.master("local")
.config("spark.sql.parquet.writeLegacyFormat", true)
.enableHiveSupport()
.getOrCreate()
// 创建一个简单的DataFrame
val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
// 创建临时表
df.createOrReplaceTempView("temp_table")
// 定义分区表的表名
val tableName="test_partition"
// 创建分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
// 读取分区表数据,并显示
spark.table(tableName).show()
执行以上代码,输出内容如下:
+---+----+---+----+ | id|name|age|year| +---+----+---+----+ |002|李四| 18|2017| |001|张三| 21|2018| +---+----+---+----+
接下来,创建另一个名为df1的DataFrame,包含新的数据,代码如下:
val data1 = Array(("011", "Sam", 21, "2018"))
val df1 = spark.createDataFrame(data1).toDF("id", "name", "age", "year")
现在调用insertInto()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:
// 覆盖插入
df1.write.mode("overwrite").insertInto(tableName)
// 读取分区表中的数据
spark.table(tableName).show()
执行以上代码,输出内容如下:
+---+----+---+----+ | id|name|age|year| +---+----+---+----+ |011| Sam| 21|2018| +---+----+---+----+
从输出结果可以看出,虽然我们新保存的数据只是year=2018分区的数据,但Spark会把原来的所有分区都删除,而不是仅覆盖year=2018分区,结果是分区表中只包含新插入的分区year=2018。
现在换一下,调用saveAsTable()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:
// 首先删除原来的分区表
spark.sql(s"drop table ${tableName}")
// 重新创建初始分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
// 覆盖保存
df1.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
// 查询覆盖保存之后的分区表数据
spark.table(tableName).show()
执行以上代码,输出内容如下:
+---+----+---+----+ | id|name|age|year| +---+----+---+----+ |011| Sam| 21|2018| +---+----+---+----+
从输出结果可以看出,虽然我们新保存的数据只是year=2018分区的数据,但Spark会把原来的所有分区都删除,而不是仅覆盖year=2018分区,结果是分区表中只包含新插入的分区year=2018。
小结:也就是说,当对分区表使用overwrite覆盖模式时,不管是调用insertInto()方法,还是调用saveAsTable()方法,都只能全表覆盖,而无法实现分区覆盖(即只覆盖year=2018这个分区)。
3、如何实现对分区表的分区覆盖?
有没有办法实现对分区表的分区覆盖呢?比如对于分区表test_partition,当df1覆盖写入时,只有year=2018这个分区被覆盖,而year=2017这个分区不受影响(得到保留)呢?
如果是在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除要被覆盖的分区(例如,year=2018),然后再使用append模式将新的df1写入这些分区。除此之外,没有更好的方法。
如果是在Spark 2.3.0及之后,因为Spark在2.3.0中引入了动态分区覆盖模式,可通过启用它来实现分区覆盖。具体实现方法如下。
首先,启用动态分区覆盖模式
val spark = SparkSession
.builder()
.appName("SparkHivePartitionOverwrite")
.master("local")
.config("spark.sql.parquet.writeLegacyFormat", true)
.config("spark.sql.sources.partitionOverwriteMode","dynamic") // 启用动态分区覆盖模式
.enableHiveSupport()
.getOrCreate()
或者:
// 构建SparkSession实例对象
val spark = SparkSession
.builder()
.appName("SparkHivePartitionOverwrite")
.master("local")
.config("spark.sql.parquet.writeLegacyFormat", true)
.enableHiveSupport()
.getOrCreate()
// 配置动态分区覆盖模式
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
然后,创建一个分区表。实现代码如下:
// 创建一个简单的DataFrame
val data = Array(("001", "张三", 21, "2018"), ("002", "李四", 18, "2017"))
val df = spark.createDataFrame(data).toDF("id", "name", "age", "year")
// 创建临时表
df.createOrReplaceTempView("temp_table")
// 定义分区表的表名
val tableName="test_partition"
// 创建分区表(按year字段分区),并写入数据
df.write.mode("overwrite").partitionBy("year").saveAsTable(tableName)
// 读取分区表数据,并显示
spark.table(tableName).show()
执行以上代码,输出内容如下:
+---+----+---+----+ | id|name|age|year| +---+----+---+----+ |002|李四| 18|2017| |001|张三| 21|2018| +---+----+---+----+
接下来,创建另一个名为df1的DataFrame,包含新的数据,代码如下:
val data1 = Array(("011", "Sam", 21, "2018"))
val df1 = spark.createDataFrame(data1).toDF("id", "name", "age", "year")
现在调用insertInto()方法 + overwrite模式,将包含新数据的df1保存到分区表中,代码如下:
// 覆盖插入
df1.write.mode("overwrite").insertInto(tableName)
// 读取分区表中的数据
spark.table(tableName).show()
执行以上代码,输出内容如下:
+---+----+---+----+ | id|name|age|year| +---+----+---+----+ |002|李四| 18|2017| |011| Sam| 21|2018| +---+----+---+----+
从输出结果可以看出,这时year=2018这个分区的值被覆盖,而year=2017这个分区的值被保留,从而实现了分区覆盖的目的。
结论:如果想实现分区覆盖,
- 在Spark 2.3.0之前,最好的解决方案是启动SQL语句来删除要被覆盖的分区,然后再使用append模式写入这些分区。
- 在Spark 2.3.0及之后:
- (1) 设置spark.sql.sources.partitionOverwriteMode的值为dynamic,启用动态分区覆盖;
- (2) 应用insertInto()方法 + overwrite保存模式,实现分区覆盖
- 如果想用SaveAsTable只覆盖表中的某些分区,这是不可能的。