Delta Lake 的 ACID 保证基于存储系统的原子性与持久性保证。具体而言,Delta Lake 在与存储系统交互时依赖以下特性:
- 原子可见性:必须确保一个文件要么完全可见,要么完全不可见。
- 互斥性:在最终目标路径下,同一时间只能有一个写入者能够创建(或重命名)文件。
- 一致性列表:一旦文件被写入某个目录,该目录后续的所有列表操作都必须返回该文件。
由于存储系统未必能直接提供上述所有保证,Delta Lake 的事务操作通常通过 LogStore API 进行,而不是直接访问存储系统。为了针对不同的存储系统提供 ACID 保证,你可能需要使用不同的 LogStore 实现。本文介绍如何为各类存储系统配置 Delta Lake。
存储系统分为两类:
1. 内置支持的存储系统:对于部分存储系统,你无需额外配置。Delta Lake 会根据路径的 scheme(例如 s3a://path 中的 s3a)动态识别存储系统,并使用相应的 LogStore 实现来提供事务保证。不过,对于 S3,并发写入时存在一些额外注意事项。
2. 其他存储系统:与 Apache Spark 类似,LogStore 使用 Hadoop FileSystem API 执行读写操作。因此,Delta Lake 支持在任何提供 FileSystem API 实现的存储系统上进行并发读取。对于需要事务保证的并发写入,根据 FileSystem 实现所提供的保证,分为两种情况:
2.1 如果该实现提供一致性列表和不覆盖的原子重命名(即 rename(..., overwrite = false) 会原子性地生成目标文件,若目标文件已存在则抛出 java.nio.file.FileAlreadyExistsException 失败),那么使用重命名操作的默认 LogStore 实现即可提供有保证的并发写入。
2.2 否则,你必须通过设置以下 Spark 配置来配置自定义的 LogStore 实现:
spark.delta.logStore..impl=<完整限定类名>
其中是你存储系统路径的 scheme。该配置使得 Delta Lake 仅为这些路径动态使用指定的 LogStore 实现。你可以在应用程序中为不同的 scheme 配置多个这样的条目,从而允许同时从不同的存储系统进行读写操作。
注意:在本地文件系统上使用 Delta Lake 可能不支持并发事务写入,因为本地文件系统不一定提供原子性重命名操作。因此,不应使用本地文件系统来测试并发写入。
在 1.0 版本之前,Delta Lake 支持通过设置 spark.delta.logStore.class 来配置 LogStore。这种方式现已弃用。设置该配置后,将导致所有路径都使用同一个配置的 LogStore,从而禁用基于 scheme 的动态委托机制。
排查 Delta 存储依赖错误
如果遇到类似 java.lang.NoClassDefFoundError: io/delta/storage/LogStore 的错误,通常表示在 Spark 类路径中缺失 Delta 存储依赖项。
带有堆栈信息的错误信息如下:
com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: io/delta/storage/LogStore Please ensure that the delta-storage dependency is included. If using Python, please ensure you call `configure_spark_with_delta_pip` or use `--packages io.delta:delta-spark_:`. See https://docs.delta.io/latest/quick-start.html#python. More information about this dependency and how to include it can be found here: https://docs.delta.io/latest/porting.html#delta-lake-1-1-or-below-to-delta-lake-1-2-or-above. at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2084) at com.google.common.cache.LocalCache.get(LocalCache.java:4017) at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4898) at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:995) at org.apache.spark.sql.delta.DeltaLog$.initializeDeltaLog$1(DeltaLog.scala:1006) ...
为什么会发生这种情况?
当使用 --jars 选项提供 Delta Spark JAR 时(例如,在测试本地构建的 JAR 时),Spark 不会自动获取其传递性依赖。在这种情况下,delta-spark 可能存在,但 delta-storage 却不存在。
因此,当 Delta 尝试加载 LogStore API 时,那些需要初始化 Delta 日志的操作(例如写入 Delta 表)可能会失败,如下所示:
df.write.format("delta").save("/tmp/delta")
如何修复?
使用 --jars 选项时,可以采取以下任一方法:
1. 同时包含 delta-spark 和 delta-storage 两个 JAR 包:
spark-shell \ --jars delta-spark_-.jar,delta-storage-.jar \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
注意:使用 --jars 指定 JAR 包时,逗号后面不要加空格。
2. 使用 assembly JAR(也称为 uber JAR 或 fat JAR,它已经包含了 delta-storage 及其所有传递依赖):
spark-shell \ --jars delta-spark-assembly-.jar \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
可通过 build/sbt "spark/assembly" 构建assembly JAR:
# 构建汇编 JAR build/sbt "spark/assembly"
assembly JAR 文件通常位于:
delta-spark/target/scala-/delta-spark-assembly-.jar
相比分别指定多个 JAR,使用汇编 JAR 可以避免遗漏依赖的问题,使部署更加简单可靠。
Delta Lake 对 HDFS 的内置支持
Delta Lake 对 HDFS 具有内置支持,可为来自多个集群的并发读写提供完整的事务保证。无需额外配置。
使用方法(HDFS):
spark.range(5).write.format("delta").save("hdfs://:/")
spark.read.format("delta").load("hdfs://:/").show()