增量查询与时间点查询
增量查询
Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供一个开始时间,更改需要从该时间流开始。如果想要在提交之后进行的所有更改(这是常见的情况),则不需要指定endTime。代码如下:
// spark-shell // 重新加载数据 spark. read. format("hudi"). load(basePath). createOrReplaceTempView("hudi_trips_snapshot") val commits = spark.sql(""" select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime """).map(k => k.getString(0)).take(50) // commits.foreach(println) val beginTime = commits(commits.length - 2) // 我们感兴趣的提交时间 // 增量查询数据 val tripsIncrementalDF = spark.read.format("hudi"). option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath) // 注册到临时表中 tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") // 查询 spark.sql(""" ............
抱歉,只有登录会员才可浏览!会员登录