增量查询与时间点查询

增量查询

Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供一个开始时间,更改需要从该时间流开始。如果想要在提交之后进行的所有更改(这是常见的情况),则不需要指定endTime。代码如下:

// spark-shell
// 重新加载数据
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("""
    select distinct(_hoodie_commit_time) as commitTime 
    from  hudi_trips_snapshot 
    order by commitTime
""").map(k => k.getString(0)).take(50)

// commits.foreach(println)

val beginTime = commits(commits.length - 2) 	// 我们感兴趣的提交时间

// 增量查询数据
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)

// 注册到临时表中
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

// 查询
spark.sql("""
    ......
          

......

抱歉,只有登录会员才可浏览!会员登录


《PySpark原理深入与编程实战》