发布日期:2026-04-04

通过文件管理提升性能

为了提升查询速度,Delta Lake 支持优化数据在存储中的布局能力。有多种方式可以对数据布局进行优化。

压缩(装箱)

注意: 此功能在 Delta Lake 1.2.0 及更高版本中可用。

Delta Lake 可以通过将小文件合并为较大的文件来提高对表的读取查询速度。

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)  // 用于基于Path的表
// 对于基于Hive metastore的表: val deltaTable = DeltaTable.forName(spark, tableName)

deltaTable.optimize().executeCompaction()

// 如果有大量的数据,但是只想优化其一个子集,可以使用`where`来指定一个可选的分区谓词
deltaTable.optimize().where("date='2021-11-18'").executeCompaction()

注意:

  • 装箱优化是幂等的,这意味着如果对同一数据集运行两次,第二次运行不会产生任何效果。
  • 装箱优化的目标是生成在磁盘大小上均匀平衡的数据文件,但不一定保证每个文件中的元组数量相同。不过,这两个指标通常是相关的。
  • 用于执行 OPTIMIZE 操作的 Python 和 Scala API 从 Delta Lake 2.0 及以上版本开始提供。
  • 设置 Spark 会话配置项 spark.databricks.delta.optimize.repartition.enabled=true,可以在压缩大量小文件时使用 repartition(1) 替代 coalesce(1),以获得更好的性能。

Delta 表的读取操作使用快照隔离(snapshot isolation),这意味着当 OPTIMIZE 从事务日志中移除不必要的文件时,读取操作不会被中断。OPTIMIZE 不会对表进行任何数据相关的更改,因此在执行 OPTIMIZE 前后进行读取,得到的结果是相同的。对作为流式源的表执行 OPTIMIZE,不会影响任何将该表作为源的当前或未来流。OPTIMIZE 会返回文件统计信息(最小值、最大值、总数等),包括操作中被移除的文件和新增的文件。优化统计信息还包含批次数以及已优化的分区数。

自动压缩

注意: 此功能在 Delta Lake 3.1.0 及以上版本中可用。

自动压缩会将 Delta 表分区内的小文件合并,从而自动减少小文件问题。自动压缩在表写入成功之后执行,并在执行写入操作的集群上同步运行。自动压缩只会压缩之前未被压缩过的文件。

可以通过设置配置项 spark.databricks.delta.autoCompact.maxFileSize 来控制输出文件的大小。

自动压缩仅在分区或表中至少存在一定数量的小文件时才会触发。可以通过设置 spark.databricks.delta.autoCompact.minNumFiles 来选择性地更改触发自动压缩所需的最小文件数量。

自动压缩可以通过以下设置在表级别或会话级别启用:

  • 表属性:delta.autoOptimize.autoCompact
  • SparkSession 设置:spark.databricks.delta.autoCompact.enabled

这些设置接受以下选项:

  • true:启用自动压缩。默认情况下将使用 128 MB 作为目标文件大小。
  • false:关闭自动压缩。可以在会话级别进行设置,以覆盖工作负载中修改的所有 Delta 表的自动压缩设置。

数据跳过

注意: 此功能在 Delta Lake 1.2.0 及以上版本中可用。

当向 Delta Lake 表写入数据时,数据跳过的信息会自动收集。Delta Lake 在查询时会利用这些信息(每列的最小值和最大值)来提供更快的查询速度。无需配置数据跳过功能;该功能在适用时自动激活。不过,其效果取决于数据的布局。为获得最佳效果,建议采用 Z-Ordering。

对包含长值(如字符串或二进制类型)的列收集统计信息是一项开销较大的操作。为了避免对这些列收集统计信息,可以配置表属性 delta.dataSkippingNumIndexedCols。该属性表示列在表结构中的位置索引。所有位置索引小于 delta.dataSkippingNumIndexedCols 属性值的列都将被收集统计信息。出于收集统计信息的目的,嵌套列中的每个字段被视为独立的列。为避免对包含长值的列收集统计信息,可以采取以下任一方式:

  • 设置 delta.dataSkippingNumIndexedCols 属性,使长值列在表结构中的位置位于该索引之后;
  • 或者使用 ALTER TABLE ALTER COLUMN 将包含长字符串的列移动到索引位置大于 delta.dataSkippingNumIndexedCols 属性的位置。

Z-Ordering(多维聚类)

注意: 此功能在 Delta Lake 2.0.0 及以上版本中可用。

Z-Ordering 是一种将相关信息存放在同一组文件中的技术。Delta Lake 会在数据跳过算法中自动利用这种共置特性,从而显著减少 Apache Spark 上的 Delta Lake 需要读取的数据量。要使用 Z-Ordering 对数据进行排序,请在 ZORDER BY 子句中指定用于排序的列:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)  // 基于path路径的表
// 对于基于Hive metastore的表: val deltaTable = DeltaTable.forName(spark, tableName)

deltaTable.optimize().executeZOrderBy(eventType)

// 如果有大量的数据但只想优化其一个子集,可以通过使用"where"来指定一个可选分区谓词
deltaTable.optimize().where("date='2021-11-18'").executeZOrderBy(eventType)

如果预期某列会经常在查询谓词中使用,且该列的基数较高(即存在大量不同值),则建议使用 ZORDER BY。

可以指定多个列作为 ZORDER BY 的参数,用逗号分隔。但是,随着额外列的增加,数据共置的效果会逐渐减弱。对未收集统计信息的列进行 Z-Ordering 将无法生效,并且会浪费资源。这是因为数据跳过需要基于列的统计信息,如最小值、最大值和计数。可以通过重新排列表结构中的列顺序来配置对特定列的统计信息收集,也可以增加收集统计信息的列数量。

注意:

1. Z-Ordering 不是幂等的。每次执行 Z-Ordering 时,它都会尝试在分区内的所有文件(包括新文件以及之前 Z-Ordering 处理过的现有文件)中重新创建数据的聚类布局。

2. Z-Ordering 的目标是生成在元组数量上均匀平衡的数据文件,但不一定保证磁盘上的数据大小均匀。这两个指标通常是相关的,但在某些情况下可能不一致,从而导致优化任务的时间出现倾斜。

例如,如果按 date 列进行 Z-Ordering,并且最近的记录在数据宽度上(例如更长的数组或字符串值)明显大于历史记录,那么可以预见 OPTIMIZE 作业的任务执行时长以及生成的文件大小都会出现倾斜。不过,这仅仅是 OPTIMIZE 命令本身的问题,不会对后续查询产生任何负面影响。

多部分检查点

注意: 此功能在 Delta Lake 2.0.0 及以上版本中可用。该功能目前处于实验性支持阶段。

Delta Lake 表会自动定期将 Delta 日志中的所有增量更新压缩到一个 Parquet 文件中。这种“检查点”机制使得读取查询能够快速重建表的当前状态(即需要处理哪些文件、当前表结构是什么),而无需读取过多包含增量更新的文件。

Delta Lake 协议支持将检查点拆分为多个 Parquet 文件。这样可以并行化并加速检查点的写入过程。在 Delta Lake 中,默认情况下每个检查点都写为单个 Parquet 文件。要使用此功能,请设置 SQL 配置项 spark.databricks.delta.checkpoint.partSize=<n>,其中 n 是操作(例如 AddFile)的数量阈值。当 Apache Spark 上的 Delta Lake 达到此阈值时,将开始并行化检查点写入,并尝试在每个检查点文件中最多写入该数量的操作。

注意: 此功能不需要对读取端进行任何配置更改。现有的读取端已经支持读取包含多个文件的检查点。

日志压缩

注意: 此功能在 Delta Lake 3.0.0 及以上版本中可用。

Delta Lake 协议支持使用格式为 <x>.<y>.compact.json 的新日志压缩文件。这些文件包含了提交范围 [x, y] 内的聚合操作。日志压缩可以减少对频繁检查点的需求,并最大程度地减少检查点所带来的延迟峰值。

对日志压缩文件的读取支持在 Delta Lake 3.0.0 及以上版本中可用。该功能默认启用,可以通过 SQL 配置项 spark.databricks.delta.deltaLog.minorCompaction.useForReads=<value> 禁用它,其中 value 可以是 true 或 false。对日志压缩的写入支持将在未来的 Delta 版本中添加。

优化写入

注意: 此功能在 Delta Lake 3.1.0 及以上版本中可用。

优化写入可以在数据写入时改善文件大小,并有利于后续对表的读取操作。

优化写入对于分区表最为有效,因为它减少了写入每个分区的小文件数量。写入较少的大文件比写入许多小文件效率更高,但仍可能会观察到写入延迟有所增加,这是因为数据在写入之前需要经过 shuffle 重分布。

下图演示了优化写入的工作原理:

优化写入工作原理

注意:可能有一段代码,在将数据写出之前运行 coalesce(n) 或 repartition(n) 来控制写入的文件数量。优化写入消除了使用这种模式的需要。

优化写入功能默认是禁用的。可以通过以下设置(按优先级从低到高排列)在表级别、SQL 会话级别和/或 DataFrameWriter 级别启用:

  • delta.autoOptimize.optimizeWrite 表属性(默认值为 None);
  • spark.databricks.delta.optimizeWrite.enabled SQL 配置项(默认值为 None);
  • DataFrameWriter 选项 optimizeWrite(默认值为 None)。

除上述设置外,还可以使用以下高级 SQL 配置项来进一步微调写入文件的数量和大小:

  • spark.databricks.delta.optimizeWrite.binSize(默认值为 512 MiB),用于控制每个输出文件的目标内存大小;
  • spark.databricks.delta.optimizeWrite.numShuffleBlocks(默认值为 50,000,000),用于控制“目标的最大 shuffle 块数量”;
  • spark.databricks.delta.optimizeWrite.maxShufflePartitions(默认值为 2,000),用于控制“优化写入可使用的最大输出桶(reducer)数量”。