流读取和流写入
Apache Iceberg使用Apache Spark的DataSourceV2 API实现数据源和catalog目录实现。Spark DSv2是一个不断发展的API,在Spark版本中提供了不同级别的支持。
从Spark 3.0开始,支持DataFrame读写。
流读取
可以使用快照历史记录来支持从Iceberg表中作为流源读取数据。Iceberg支持处理spark结构化流作业中的增量数据,这些作业从历史时间戳开始,模板代码如下:
val df = spark.readStream .format("iceberg") .option("stream-from-timestamp", Long.toString(streamStartTimestamp)) .load("database.table_name")
Iceberg仅支持从append快照读取数据,而overwrite快照不能够被处理,会引起异常。类似地,默认情况下delete快照会导致异常,但是通过设置stream-skip-delete-snapshot=true可以忽略delete。
流写入
使用Iceberg表作为Spark结构化流的Data Sink(数据接收器)。Spark为所有的sink提供了queryId和epochId,必须保证所有的写操作都是幂等的。Spark可能会尝试多次提交同一个批处理。因此,用户需要知道每个查询最新提交的epochId。一种方法是在快照摘要中持久化queryId和epochId。在写操作上,可以简单地遍历快照并检查给定查询最新提交的epochId,以使写操作幂等。
要将流查询的值写入Iceberg表,使用DataStreamWriter,模板代码如下:
............
抱歉,只有登录会员才可浏览!会员登录