增量查询与时间点查询
增量查询
Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供一个开始时间,更改需要从该时间流开始。如果想要在提交之后进行的所有更改(这是常见的情况),则不需要指定endTime。代码如下:
// spark-shell
// 重新加载数据
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("""
select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot
order by commitTime
""").map(k => k.getString(0)).take(50)
// commits.foreach(println)
val beginTime = commits(commits.length - 2) // 我们感兴趣的提交时间
// 增量查询数据
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
// 注册到临时表中
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
// 查询
spark.sql("""
......
......
抱歉,只有登录会员才可浏览!会员登录