发布日期:2022-10-20 VIP内容

使用分区表和分桶表

分区是一种优化技术,用于根据某些属性将表划分为若干部分。

通常,分区只是将特定类型或值的项分组以便更快地访问的一种方法。这样做的好处是,对于只访问部分数据的查询,读取和加载时间会更快。例如,跟踪日志事件、消息和事件时间的日志表可能有数百万个条目,跨度长达数月。将这些条目按天进行分区,可以更快地查询某一天发生的日志事件。

Iceberg通过实现隐藏分区为用户简化了分区。Iceberg没有强制用户在查询时提供单独的分区过滤器,而是在底层处理分区和查询的所有细节。用户不需要维护分区列,甚至不需要理解物理表布局就可以得到准确的查询结果。

Iceberg有几个分区选项。用户可以按年、月、日和小时划分时间戳。Iceberg可以跟踪列值与其分区之间的关系,而不需要额外的列。例如,当在按日(day)分区、带有“YYYY-MM-DD hh:mm:ss”时间戳格式的数据上查询时,查询语句中不需要包含“hh:mm:ss”部分。Iceberg还可以通过标识、散列桶或截断来划分分类列值。

下面通过一个示例来了解如何使用Iceberg分区表,以及Iceberg分区表的一些特性。本示例需要一个数据集文件jd-formated.parquet,该文件包含2017年02月16日到2022年02月15日共五年的京东股票交易数据。首先将该数据集加载到DataFrame中,代码如下:

// 加载数据集到DataFrame
val file = "/data/spark/jd/jd-formated.parquet" 
val jdDF = spark.read.load(file)

// 查看schema和部分数据
jdDF.printSchema()
jdDF.show()

执行以上代码,输出内容如下:

root
 |-- Date: date (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)

+----------+-----+-------+------+-----+-----+
|       Date|Close| Volume|   Open| High|  Low|
+----------+-----+-------+------+-----+-----+
|2022-02-15|76.13|6766205| 75.35|76.35| 74.8|
|2022-02-14|74.45|5244967| 73.94|74.62|73.01|
|2022-02-11|73.98|6673354| 75.97|76.55|73.55|
|2022-02-10| 76.4|6432184|75.955|78.39|75.24|
|2022-02-09|78.29|7061571| 76.83|78.67|76.61|
+----------+-----+-------+------+-----+-----+
only showing top 5 rows

接下来,将jdDF写入到Iceberg数据湖中。Spark 3引入了新的DataFrameWriterV2 API,用于使用DataFrame写入表。推荐使用v2 API有以下几个原因:

(1) 支持CTAS、RTAS和过滤器覆盖。

(2) 所有操作都一致地按名称将列写入表。

(3) partitionedBy支持隐藏分区表达式。

(4) 覆盖行为是显式的,可以是动态的,也可以是用户提供的过滤器。

(5) 每个操作的行为都对应于SQL语句。

  • df.writeTo(t).create() 等价于 CREATE TABLE AS SELECT。
  • df.writeTo(t).replace() 等价于 REPLACE TABLE AS SELECT。
  • df.writeTo(t).append() 等价于 INSERT INTO。
  • df.writeTo(t).overwritePartitions() 等价于 dynamic INSERT OVERWRITE。

(6) 创建表。

要运行一个CTAS或RTAS,使用create、replace或createOrReplace操作,代码如下:

val data: DataFrame = ...
data.writeTo("prod.db.table").create()

另外,create、replace或createOrReplace操作还支持表配置方法,如partitionedBy()和tableProperty(),代码如下:

data.writeTo("prod.db.table")
    .tableProperty("write.format.default", "orc")
    .partitionBy($"level", days($"ts"))
    .createOrReplace()

(7) 覆盖数据。

要动态地覆盖分区,可以使用overwritePartitions(),代码如下:

val data: DataFrame = ...
data.writeTo("prod.db.table").overwritePartitions()

要显式覆盖分区,使用overwrite来提供一个过滤器,代码如下:

data.writeTo("prod.db.table").overwrite($"level" === "INFO")

注意: 虽然Spark 3仍然支持v1 DataFrame write API,但不推荐使用。在Spark 3中使用v1 DataFrame API写数据时,可以使用saveAsTable或insertInto来加载带有catalog目录的表。使用format("iceberg")加载一个独立的表引用,该引用不会自动刷新查询使用的表。

继续使用前面几节定义的hadoop_prod catalog和my_db数据库,代码如下:

// 写数据到分区表
jdDF
    .sortWithinPartitions("Date")		// 要先排序
    .writeTo("hadoop_prod.jd_db.stock_part")	// 写入iceberg表
    .partitionedBy(months($"Date"))  // 按天分区存储,这里使用了转换函数months
    .create()

在对分区表进行写操作之前,Iceberg要求根据每个任务(Spark分区)的分区规范对数据进行排序。这既适用于SQL写入,也适用于DataFrame写入。如果使用DataFrame插入数据,则可以使用orderBy()/sort()来触发全局排序,或使用sortWithinPartitions()来触发局部排序(全局排序(orderBy()/sort())和本地排序(sortWithinPartitions())都可以满足需求)。

上面的代码中,对Date字段应用了日期转换函数months(),以指定按Date字段中的月份来分区存储。Iceberg支持的转换函数有:

  • (1) years(ts):按年分区。
  • (2) months(ts):按月分区。
  • (3) days(ts)或date(ts):按天分区。
  • (4) hours(ts)或date_hour(ts):按天和小时分区。
  • (5) bucket(N, col):按哈希值对N个分桶取模来分区。
  • (6) truncate(L, col):以截断为L的值进行分区。字符串会被截断到给定的长度。Integer和Long会被截断到bins:truncate(10,i)产生分区0、10、20、30、...。

执行上面的代码之后,可以查看相应的物理存储目录。这既可以通过hdfs shell命令来查看,也可以通过HDFS的Web UI来查看。通过Web UI查看的部分分区存储目录,它们位于表stock_part的data目录下,如图11-1所示。

下面执行条件查询,查询出2022年2月份的京东股票数据,代码如下:

// 查询
spark.sql("""
    select * 
    from hadoop_prod.jd_db.stock_part 
    where date>'2022-01-31'
""").show()

执行以上代码,查询结果如下:

+----------+-----+--------+------+-----+------+
|       Date|Close|  Volume|   Open| High|   Low|
+----------+-----+--------+------+-----+------+
|2022-02-15|76.13| 6766205| 75.35|76.35|  74.8|
|2022-02-14|74.45| 5244967| 73.94|74.62| 73.01|
|2022-02-11|73.98| 6673354| 75.97|76.55| 73.55|
|2022-02-10| 76.4| 6432184|75.955|78.39| 75.24|
|2022-02-09|78.29| 7061571| 76.83|78.67| 76.61|
|2022-02-08|75.36| 7903249| 73.12|76.07| 72.05|
|2022-02-07|73.15| 6135832| 74.09|74.99| 72.81|
|2022-02-04|73.77| 6082889| 71.94|74.95| 71.86|
|2022-02-03|71.85| 7493688| 72.08| 73.3| 71.33|
|2022-02-02|73.21| 5887066| 75.58|75.71| 72.41|
|2022-02-01|75.08| 7408338| 74.26|75.89| 73.84|
|2022-01-31|74.88| 9331051| 71.84|74.95| 71.79|
|2022-01-28|69.34| 9026547|  67.6|69.42|   66.2|
|2022-01-27|66.68|13137080| 70.75| 70.8|65.785|
|2022-01-26|71.11| 9546074|  72.9|73.49| 70.51|
|2022-01-25|72.15| 7291178| 70.28|72.97|  70.2|
|2022-01-24|71.74|15503400| 72.52|72.52| 68.11|
|2022-01-21|73.46|14440500|77.602|77.68| 72.66|
|2022-01-20| 77.0|26218260| 77.25|81.24| 76.77|
|2022-01-19| 72.3| 6318328| 73.18|74.22| 72.03|
+----------+-----+--------+------+-----+------+
only showing top 20 rows

上面的例子中,使用了DataFrameWriterV2 API写入分区表数据。Iceberg也支持使用SQL创建分区表和写入数据。例如,使用partitioned by创建一个分区表,应用SQL语句,代码如下:

// 先创建表
spark.sql("""
   create table hadoop_prod.jd_db.stock_part2(
        Date date,
        Close double,
        Volume bigint,
        Open double,
        High double,
        Low double
   ) 
   using iceberg 
   partitioned by(years(Date))     // 按年分区
""") 

PARTITIONED BY子句支持转换表达式来创建隐藏的分区,上面代码中通过years()转换指定按年进行分区。下面使用INSERT INTO ... SELECT ...语句插入数据,代码如下:

// 写入分区表数据
spark.sql("""
    INSERT INTO hadoop_prod.jd_db.stock_part2 
    SELECT Date,Close,Volume,Open,High,Low
    FROM hadoop_prod.jd_db.stock_part
    ORDER BY Date
""")

// 查看写入是否成功
spark.table("hadoop_prod.jd_db.stock_part2").show(5)

执行以上代码,输出结果如下:

+----------+-----+-------+-----+------+-----+
|       Date|Close| Volume| Open|   High|  Low|
+----------+-----+-------+-----+------+-----+
|2017-02-16|30.23|7694667|30.32| 30.57|30.03|
|2017-02-17|29.85|7080140|29.57| 30.27|29.51|
|2017-02-21|30.23|5131289| 30.0|30.285|29.81|
|2017-02-22|30.47|6604136|30.27| 30.67| 30.1|
|2017-02-23|30.61|9921265|30.75| 30.88|30.23|
+----------+-----+-------+-----+------+-----+
only showing top 5 rows

然后通过Web UI查看物理分区布局,如图11-2所示。

1.RTAS(replace table ... as select)

当使用SparkCatalog时,Iceberg支持RTAS作为原子操作。也可以使用RTAS方法根据已有的表来替换并导入一个新的分区表。下面的示例中创建了分区表stock_part2,指定按股票交易日期中的月份来分区,其数据来源于对stock_part表的查询,代码如下:

// 覆盖stock_part2 表,改为按月分区
spark.sql("""
    REPLACE TABLE hadoop_prod.jd_db.stock_part2 
    USING iceberg
    PARTITIONED BY (months(Date))
    AS SELECT Date,Close,Volume,Open,High,Low
       FROM hadoop_prod.jd_db.stock_part
       ORDER BY Date
""")

执行以上代码后,查看stock_part2表的物理布局,可以看到每个月份的交易数据单独存储在一个分区中,结构如图11-3所示。

2.CTAS(create table ... as select)

在使用SparkCatalog时,Iceberg支持CTAS作为原子操作。也可以使用CTAS方法根据已有的表来创建并导入一个新的分区表。下面的代码创建了分区表stock_part3,指定按股票交易日期中的月份来分区,其数据来源于对stock_part表中所有2021年交易数据的查询。代码如下:

// 将2021年的交易记录查询出来,单独写到分区表stock_part3表中,并按月分区存储
spark.sql("""
    CREATE TABLE hadoop_prod.jd_db.stock_part3 
    USING iceberg
    PARTITIONED BY (months(Date))
    AS SELECT Date,Close,Volume,Open,High,Low
       FROM hadoop_prod.jd_db.stock_part
       WHERE year(Date)="2021"
       ORDER BY Date
""")

执行以上代码后,查看stock_part3表的物理布局,可以看到2021年每个月份的交易数据单独存储在一个分区中,结构如图11-4所示。

3.桶表

对于大多数分区转换,可以简单地将原始列添加到排序条件中,但bucket()除外。对于bucket()分区转换,需要在Spark中注册Iceberg转换函数,以便在排序过程中指定它。

下面是另一个有桶分区的示例。首先创建一个桶表,按Date字段分,代码如下:

spark.sql("""
   create table hadoop_prod.jd_db.stock_part4(
        Date date,
        Close double,
        Volume bigint,
        Open double,
        High double,
        Low double
   ) 
   using iceberg 
   partitioned by(bucket(5, Date))
""")

需要注册函数来处理bucket,注册代码如下:

import org.apache.iceberg.spark.IcebergSpark
import org.apache.spark.sql.types.DataTypes

IcebergSpark.registerBucketUDF(spark, "stock_bucket5", DataTypes.DateType, 5)

这里将bucket函数注册为stock_bucket5,它可以在sort子句中使用。

如果用SQL语句插入数据,可以像下面这样使用该函数,代码如下:

spark.sql("""
    INSERT INTO hadoop_prod.jd_db.stock_part4
    SELECT Date,Close,Volume,Open,High,Low 
    FROM hadoop_prod.jd_db.stock_part
    ORDER BY stock_bucket5(Date) 
""")

如果用DataFrame插入数据,可以像下面这样使用该函数,代码如下:

spark
    .table("hadoop_prod.jd_db.stock_part")
    .sortWithinPartitions(expr("stock_bucket5(Date)"))
    .writeTo("hadoop_prod.jd_db.stock_part4")
    .append()

查看stock_part4表的物理布局,可以看到有5个与桶对应的文件夹,如图11-5所示。

还支持使用CTAS创建分桶表,代码如下:

spark
    .table("hadoop_prod.jd_db.stock_part")
    .sortWithinPartitions(expr("stock_bucket5(Date)"))
    .writeTo("hadoop_prod.jd_db.stock_part5")
    .partitionedBy(bucket(5, $"Date"))
    .create()

查看stock_part5表的物理布局,同样可以看到有5个与桶对应的文件夹,如图11-6所示。

也可以混合使用分桶和分区。例如,先把数据分为5个桶,每个桶下面再按月份分区(这里使用CTAS方法,创建了表stock_part6),代码如下:

// 先分桶,再在每个桶下面分区
spark
    .table("hadoop_prod.jd_db.stock_part")
    .sortWithinPartitions(expr("stock_bucket5(Date)"))
    .writeTo("hadoop_prod.jd_db.stock_part6")
    .partitionedBy(bucket(5, $"Date"), months($"Date"))
    .create()

查看stock_part6表的物理布局,可以看到有5个与桶对应的文件夹,如图11-7所示。

导航到stock_part6表的每个桶文件夹中,可以看到每个桶中按月进一分进行了分区。如图11-8所示。

也可以先分区,每个分区下面再分桶。在下面的代码中,先把数据按年份进行分区,每个分区下面再按Volumn列分为6个桶(纯粹出于演示的目的选择Volumn列)。为此,需要先将bucket函数注册为stock_bucket6,在sort子句中使用,代码如下:

// 需要注册函数来处理bucket,如下所示。
import org.apache.iceberg.spark.IcebergSpark
import org.apache.spark.sql.types.DataTypes

IcebergSpark.registerBucketUDF(spark, "stock_bucket6", DataTypes.LongType, 6) 

然后使用CTAS创建一个新的表stock_part7并插入数据,代码如下:

spark
    .table("hadoop_prod.jd_db.stock_part")
    .sortWithinPartitions(expr("stock_bucket6(Volume)"))
    .writeTo("hadoop_prod.jd_db.stock_part7")
    .partitionedBy(years($"Date"), bucket(6, $"Volume"))
    .create()

查看stock_part7表的物理布局,可以看到每年有一个对应的文件夹,如图11-9所示。

导航到stock_part7表的每个分区文件夹中,可以看到每个分区中进一分划分为6个桶。如图11-10所示。

4.DELETE FROM

Spark 3增加了对DELETE FROM查询的支持,可以从表中删除数据。

Delete查询接受一个过滤器来匹配要删除的行,代码如下:

spark.sql("""
    DELETE FROM hadoop_prod.jd_db.stock_part
    WHERE Date >= '2020-05-01 00:00:00' and Date < '2020-06-01 00:00:00' """) 

如果删除过滤器匹配表的整个分区,Iceberg将执行一个只删除元数据的操作。如果过滤器匹配表中的各个行,那么Iceberg将只重写受影响的数据文件。