流读取和流写入

Apache Iceberg使用Apache Spark的DataSourceV2 API实现数据源和catalog目录实现。Spark DSv2是一个不断发展的API,在Spark版本中提供了不同级别的支持。

从Spark 3.0开始,支持DataFrame读写。

流读取

可以使用快照历史记录来支持从Iceberg表中作为流源读取数据。Iceberg支持处理spark结构化流作业中的增量数据,这些作业从历史时间戳开始,模板代码如下:

val df = spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
    .load("database.table_name")

Iceberg仅支持从append快照读取数据,而overwrite快照不能够被处理,会引起异常。类似地,默认情况下delete快照会导致异常,但是通过设置stream-skip-delete-snapshot=true可以忽略delete。

流写入

使用Iceberg表作为Spark结构化流的Data Sink(数据接收器)。Spark为所有的sink提供了queryId和epochId,必须保证所有的写操作都是幂等的。Spark可能会尝试多次提交同一个批处理。因此,用户需要知道每个查询最新提交的epochId。一种方法是在快照摘要中持久化queryId和epochId。在写操作上,可以简单地遍历快照并检查给定查询最新提交的epochId,以使写操作幂等。

要将流查询的值写入Iceberg表,使用DataStreamWriter,模板代码如下:

......

......

抱歉,只有登录会员才可浏览!会员登录


《PySpark原理深入与编程实战》