使用分区表和分桶表
分区是一种优化技术,用于根据某些属性将表划分为若干部分。
通常,分区只是将特定类型或值的项分组以便更快地访问的一种方法。这样做的好处是,对于只访问部分数据的查询,读取和加载时间会更快。例如,跟踪日志事件、消息和事件时间的日志表可能有数百万个条目,跨度长达数月。将这些条目按天进行分区,可以更快地查询某一天发生的日志事件。
Iceberg通过实现隐藏分区为用户简化了分区。Iceberg没有强制用户在查询时提供单独的分区过滤器,而是在底层处理分区和查询的所有细节。用户不需要维护分区列,甚至不需要理解物理表布局就可以得到准确的查询结果。
Iceberg有几个分区选项。用户可以按年、月、日和小时划分时间戳。Iceberg可以跟踪列值与其分区之间的关系,而不需要额外的列。例如,当在按日(day)分区、带有“YYYY-MM-DD hh:mm:ss”时间戳格式的数据上查询时,查询语句中不需要包含“hh:mm:ss”部分。Iceberg还可以通过标识、散列桶或截断来划分分类列值。
下面通过一个示例来了解如何使用Iceberg分区表,以及Iceberg分区表的一些特性。本示例需要一个数据集文件jd-formated.parquet,该文件包含2017年02月16日到2022年02月15日共五年的京东股票交易数据。首先将该数据集加载到DataFrame中,代码如下:
// 加载数据集到DataFrame val file = "/data/spark/jd/jd-formated.parquet" val jdDF = spark.read.load(file) // 查看schema和部分数据 jdDF.printSchema() jdDF.show()
执行以上代码,输出内容如下:
root |-- Date: date (nullable = true) |-- Close: double (nullable = true) |-- Volume: long (nullable = true) |-- Open: double (nullable = true) |-- High: double (nullable = true) |-- Low: double (nullable = true) +----------+-----+-------+------+-----+-----+ | Date|Close| Volume| Open| High| Low| +----------+-----+-------+------+-----+-----+ |2022-02-15|76.13|6766205| 75.35|76.35| 74.8| |2022-02-14|74.45|5244967| 73.94|74.62|73.01| |2022-02-11|73.98|6673354| 75.97|76.55|73.55| |2022-02-10| 76.4|6432184|75.955|78.39|75.24| |2022-02-09|78.29|7061571| 76.83|78.67|76.61| +----------+-----+-------+------+-----+-----+ only showing top 5 rows
接下来,将jdDF写入到Iceberg数据湖中。Spark 3引入了新的DataFrameWriterV2 API,用于使用DataFrame写入表。推荐使用v2 API有以下几个原因:
(1) 支持CTAS、RTAS和过滤器覆盖。
(2) 所有操作都一致地按名称将列写入表。
(3) partitionedBy支持隐藏分区表达式。
(4) 覆盖行为是显式的,可以是动态的,也可以是用户提供的过滤器。
(5) 每个操作的行为都对应于SQL语句。
- df.writeTo(t).create() 等价于 CREATE TABLE AS SELECT。
- df.writeTo(t).replace() 等价于 REPLACE TABLE AS SELECT。
- df.writeTo(t).append() 等价于 INSERT INTO。
- df.writeTo(t).overwritePartitions() 等价于 dynamic INSERT OVERWRITE。
(6) 创建表。
要运行一个CTAS或RTAS,使用create、replace或createOrReplace操作,代码如下:
val data: DataFrame = ...
data.writeTo("prod.db.table").create()
另外,create、replace或createOrReplace操作还支持表配置方法,如partitionedBy()和tableProperty(),代码如下:
data.writeTo("prod.db.table")
.tableProperty("write.format.default", "orc")
.partitionBy($"level", days($"ts"))
.createOrReplace()
(7) 覆盖数据。
要动态地覆盖分区,可以使用overwritePartitions(),代码如下:
val data: DataFrame = ...
data.writeTo("prod.db.table").overwritePartitions()
要显式覆盖分区,使用overwrite来提供一个过滤器,代码如下:
data.writeTo("prod.db.table").overwrite($"level" === "INFO")
注意: 虽然Spark 3仍然支持v1 DataFrame write API,但不推荐使用。在Spark 3中使用v1 DataFrame API写数据时,可以使用saveAsTable或insertInto来加载带有catalog目录的表。使用format("iceberg")加载一个独立的表引用,该引用不会自动刷新查询使用的表。
继续使用前面几节定义的hadoop_prod catalog和my_db数据库,代码如下:
// 写数据到分区表
jdDF
.sortWithinPartitions("Date") // 要先排序
.writeTo("hadoop_prod.jd_db.stock_part") // 写入iceberg表
.partitionedBy(months($"Date")) // 按天分区存储,这里使用了转换函数months
.create()
在对分区表进行写操作之前,Iceberg要求根据每个任务(Spark分区)的分区规范对数据进行排序。这既适用于SQL写入,也适用于DataFrame写入。如果使用DataFrame插入数据,则可以使用orderBy()/sort()来触发全局排序,或使用sortWithinPartitions()来触发局部排序(全局排序(orderBy()/sort())和本地排序(sortWithinPartitions())都可以满足需求)。
上面的代码中,对Date字段应用了日期转换函数months(),以指定按Date字段中的月份来分区存储。Iceberg支持的转换函数有:
- (1) years(ts):按年分区。
- (2) months(ts):按月分区。
- (3) days(ts)或date(ts):按天分区。
- (4) hours(ts)或date_hour(ts):按天和小时分区。
- (5) bucket(N, col):按哈希值对N个分桶取模来分区。
- (6) truncate(L, col):以截断为L的值进行分区。字符串会被截断到给定的长度。Integer和Long会被截断到bins:truncate(10,i)产生分区0、10、20、30、...。
执行上面的代码之后,可以查看相应的物理存储目录。这既可以通过hdfs shell命令来查看,也可以通过HDFS的Web UI来查看。通过Web UI查看的部分分区存储目录,它们位于表stock_part的data目录下,如图11-1所示。
下面执行条件查询,查询出2022年2月份的京东股票数据,代码如下:
// 查询
spark.sql("""
select *
from hadoop_prod.jd_db.stock_part
where date>'2022-01-31'
""").show()
执行以上代码,查询结果如下:
+----------+-----+--------+------+-----+------+ | Date|Close| Volume| Open| High| Low| +----------+-----+--------+------+-----+------+ |2022-02-15|76.13| 6766205| 75.35|76.35| 74.8| |2022-02-14|74.45| 5244967| 73.94|74.62| 73.01| |2022-02-11|73.98| 6673354| 75.97|76.55| 73.55| |2022-02-10| 76.4| 6432184|75.955|78.39| 75.24| |2022-02-09|78.29| 7061571| 76.83|78.67| 76.61| |2022-02-08|75.36| 7903249| 73.12|76.07| 72.05| |2022-02-07|73.15| 6135832| 74.09|74.99| 72.81| |2022-02-04|73.77| 6082889| 71.94|74.95| 71.86| |2022-02-03|71.85| 7493688| 72.08| 73.3| 71.33| |2022-02-02|73.21| 5887066| 75.58|75.71| 72.41| |2022-02-01|75.08| 7408338| 74.26|75.89| 73.84| |2022-01-31|74.88| 9331051| 71.84|74.95| 71.79| |2022-01-28|69.34| 9026547| 67.6|69.42| 66.2| |2022-01-27|66.68|13137080| 70.75| 70.8|65.785| |2022-01-26|71.11| 9546074| 72.9|73.49| 70.51| |2022-01-25|72.15| 7291178| 70.28|72.97| 70.2| |2022-01-24|71.74|15503400| 72.52|72.52| 68.11| |2022-01-21|73.46|14440500|77.602|77.68| 72.66| |2022-01-20| 77.0|26218260| 77.25|81.24| 76.77| |2022-01-19| 72.3| 6318328| 73.18|74.22| 72.03| +----------+-----+--------+------+-----+------+ only showing top 20 rows
上面的例子中,使用了DataFrameWriterV2 API写入分区表数据。Iceberg也支持使用SQL创建分区表和写入数据。例如,使用partitioned by创建一个分区表,应用SQL语句,代码如下:
// 先创建表
spark.sql("""
create table hadoop_prod.jd_db.stock_part2(
Date date,
Close double,
Volume bigint,
Open double,
High double,
Low double
)
using iceberg
partitioned by(years(Date)) // 按年分区
""")
PARTITIONED BY子句支持转换表达式来创建隐藏的分区,上面代码中通过years()转换指定按年进行分区。下面使用INSERT INTO ... SELECT ...语句插入数据,代码如下:
// 写入分区表数据
spark.sql("""
INSERT INTO hadoop_prod.jd_db.stock_part2
SELECT Date,Close,Volume,Open,High,Low
FROM hadoop_prod.jd_db.stock_part
ORDER BY Date
""")
// 查看写入是否成功
spark.table("hadoop_prod.jd_db.stock_part2").show(5)
执行以上代码,输出结果如下:
+----------+-----+-------+-----+------+-----+ | Date|Close| Volume| Open| High| Low| +----------+-----+-------+-----+------+-----+ |2017-02-16|30.23|7694667|30.32| 30.57|30.03| |2017-02-17|29.85|7080140|29.57| 30.27|29.51| |2017-02-21|30.23|5131289| 30.0|30.285|29.81| |2017-02-22|30.47|6604136|30.27| 30.67| 30.1| |2017-02-23|30.61|9921265|30.75| 30.88|30.23| +----------+-----+-------+-----+------+-----+ only showing top 5 rows
然后通过Web UI查看物理分区布局,如图11-2所示。
1.RTAS(replace table ... as select)
当使用SparkCatalog时,Iceberg支持RTAS作为原子操作。也可以使用RTAS方法根据已有的表来替换并导入一个新的分区表。下面的示例中创建了分区表stock_part2,指定按股票交易日期中的月份来分区,其数据来源于对stock_part表的查询,代码如下:
// 覆盖stock_part2 表,改为按月分区
spark.sql("""
REPLACE TABLE hadoop_prod.jd_db.stock_part2
USING iceberg
PARTITIONED BY (months(Date))
AS SELECT Date,Close,Volume,Open,High,Low
FROM hadoop_prod.jd_db.stock_part
ORDER BY Date
""")
执行以上代码后,查看stock_part2表的物理布局,可以看到每个月份的交易数据单独存储在一个分区中,结构如图11-3所示。
2.CTAS(create table ... as select)
在使用SparkCatalog时,Iceberg支持CTAS作为原子操作。也可以使用CTAS方法根据已有的表来创建并导入一个新的分区表。下面的代码创建了分区表stock_part3,指定按股票交易日期中的月份来分区,其数据来源于对stock_part表中所有2021年交易数据的查询。代码如下:
// 将2021年的交易记录查询出来,单独写到分区表stock_part3表中,并按月分区存储
spark.sql("""
CREATE TABLE hadoop_prod.jd_db.stock_part3
USING iceberg
PARTITIONED BY (months(Date))
AS SELECT Date,Close,Volume,Open,High,Low
FROM hadoop_prod.jd_db.stock_part
WHERE year(Date)="2021"
ORDER BY Date
""")
执行以上代码后,查看stock_part3表的物理布局,可以看到2021年每个月份的交易数据单独存储在一个分区中,结构如图11-4所示。
3.桶表
对于大多数分区转换,可以简单地将原始列添加到排序条件中,但bucket()除外。对于bucket()分区转换,需要在Spark中注册Iceberg转换函数,以便在排序过程中指定它。
下面是另一个有桶分区的示例。首先创建一个桶表,按Date字段分,代码如下:
spark.sql("""
create table hadoop_prod.jd_db.stock_part4(
Date date,
Close double,
Volume bigint,
Open double,
High double,
Low double
)
using iceberg
partitioned by(bucket(5, Date))
""")
需要注册函数来处理bucket,注册代码如下:
import org.apache.iceberg.spark.IcebergSpark import org.apache.spark.sql.types.DataTypes IcebergSpark.registerBucketUDF(spark, "stock_bucket5", DataTypes.DateType, 5)
这里将bucket函数注册为stock_bucket5,它可以在sort子句中使用。
如果用SQL语句插入数据,可以像下面这样使用该函数,代码如下:
spark.sql("""
INSERT INTO hadoop_prod.jd_db.stock_part4
SELECT Date,Close,Volume,Open,High,Low
FROM hadoop_prod.jd_db.stock_part
ORDER BY stock_bucket5(Date)
""")
如果用DataFrame插入数据,可以像下面这样使用该函数,代码如下:
spark
.table("hadoop_prod.jd_db.stock_part")
.sortWithinPartitions(expr("stock_bucket5(Date)"))
.writeTo("hadoop_prod.jd_db.stock_part4")
.append()
查看stock_part4表的物理布局,可以看到有5个与桶对应的文件夹,如图11-5所示。
还支持使用CTAS创建分桶表,代码如下:
spark
.table("hadoop_prod.jd_db.stock_part")
.sortWithinPartitions(expr("stock_bucket5(Date)"))
.writeTo("hadoop_prod.jd_db.stock_part5")
.partitionedBy(bucket(5, $"Date"))
.create()
查看stock_part5表的物理布局,同样可以看到有5个与桶对应的文件夹,如图11-6所示。
也可以混合使用分桶和分区。例如,先把数据分为5个桶,每个桶下面再按月份分区(这里使用CTAS方法,创建了表stock_part6),代码如下:
// 先分桶,再在每个桶下面分区
spark
.table("hadoop_prod.jd_db.stock_part")
.sortWithinPartitions(expr("stock_bucket5(Date)"))
.writeTo("hadoop_prod.jd_db.stock_part6")
.partitionedBy(bucket(5, $"Date"), months($"Date"))
.create()
查看stock_part6表的物理布局,可以看到有5个与桶对应的文件夹,如图11-7所示。
导航到stock_part6表的每个桶文件夹中,可以看到每个桶中按月进一分进行了分区。如图11-8所示。
也可以先分区,每个分区下面再分桶。在下面的代码中,先把数据按年份进行分区,每个分区下面再按Volumn列分为6个桶(纯粹出于演示的目的选择Volumn列)。为此,需要先将bucket函数注册为stock_bucket6,在sort子句中使用,代码如下:
// 需要注册函数来处理bucket,如下所示。 import org.apache.iceberg.spark.IcebergSpark import org.apache.spark.sql.types.DataTypes IcebergSpark.registerBucketUDF(spark, "stock_bucket6", DataTypes.LongType, 6)
然后使用CTAS创建一个新的表stock_part7并插入数据,代码如下:
spark
.table("hadoop_prod.jd_db.stock_part")
.sortWithinPartitions(expr("stock_bucket6(Volume)"))
.writeTo("hadoop_prod.jd_db.stock_part7")
.partitionedBy(years($"Date"), bucket(6, $"Volume"))
.create()
查看stock_part7表的物理布局,可以看到每年有一个对应的文件夹,如图11-9所示。
导航到stock_part7表的每个分区文件夹中,可以看到每个分区中进一分划分为6个桶。如图11-10所示。
4.DELETE FROM
Spark 3增加了对DELETE FROM查询的支持,可以从表中删除数据。
Delete查询接受一个过滤器来匹配要删除的行,代码如下:
spark.sql("""
DELETE FROM hadoop_prod.jd_db.stock_part
WHERE Date >= '2020-05-01 00:00:00' and Date < '2020-06-01 00:00:00' """)
如果删除过滤器匹配表的整个分区,Iceberg将执行一个只删除元数据的操作。如果过滤器匹配表中的各个行,那么Iceberg将只重写受影响的数据文件。