合并更新(upsert)
这一节,将解释如何更新表列,以及如何使用merge命令执行upsert操作。
Delta Lake存储使用parquet文件,而parquet文件是不可变的,不支持更新。Delta Lake提供了merge语句来提供类似于更新的接口,但实际上,这些并不是真正的更新,不会改变底层文件。Delta Lake只是在重写整个Parquet文件。这将使大型数据集上的upsert或update列语句非常慢。
1.merge示例
下面通过一个示例来理解和掌握使用merge命令的方法。
首先准备一个样本events_data.csv,其内容如下:
eventType,websitePage click,homepage clck,about page mouseOver,logo
读取该数据文件,并写出到Delta Lake中,代码如下:
// 数据源文件 val path = "/data/data_lake/delta_lake/event_data.csv" // delta路径(HDFS) val outputPath = "/data_lake/delta_lake/events/" spark .read .option("header", "true") .option("charset", "UTF8") .csv(path) .repartition(1) .write .format("delta") .save(outputPath)
看一下_delta_log/ 0000000000000000000000.json事务日志文件中存储了什么内容。执行以下hdfs shell命 ......
......
抱歉,只有登录会员才可浏览!会员登录