更新数据
这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中,并将DataFrame写入hudi表。代码如下:
// 生成对现有行程进行更新的数据 val updates = convertToStringList(dataGen.generateUpdates(10)) val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) // 以Append模式将新的数据写入到Hudi表中 df.write.format("hudi") .options(getQuickstartWriteConfigs) .option(PRECOMBINE_FIELD_OPT_KEY, "ts") .option(RECORDKEY_FIELD_OPT_KEY, "uuid") .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath") .option(TABLE_NAME, tableName) .mode(SaveMode.Append) // 注意这里,Append .save(basePath)
注意,保存模式现在是SaveMode.Append。一般来说,除非第一次创建表,否则总是使用append模式。再次查询数据将显示更新的行程。每个写操作都会生成一个由时间戳表示的新提交。代码如下:
// 再次从Hudi表中读取数据到DataFrame中 val tripsSnapshotDF2 = spark .read .format("hudi") .load(basePath) tripsSnapshotDF2.show(5) println(tripsSnapshotDF.count) ............
抱歉,只有登录会员才可浏览!会员登录