更新数据
这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中,并将DataFrame写入hudi表。代码如下:
// 生成对现有行程进行更新的数据
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
// 以Append模式将新的数据写入到Hudi表中
df.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
.option(TABLE_NAME, tableName)
.mode(SaveMode.Append) // 注意这里,Append
.save(basePath)
注意,保存模式现在是SaveMode.Append。一般来说,除非第一次创建表,否则总是使用append模式。再次查询数据将显示更新的行程。每个写操作都会生成一个由时间戳表示的新提交。代码如下:
// 再次从Hudi表中读取数据到DataFrame中
val tripsSnapshotDF2 = spark
.read
.format("hudi")
.load(basePath)
tripsSnapshotDF2.show(5)
println(tripsSnapshotDF.count) ......
......
抱歉,只有登录会员才可浏览!会员登录