合并更新(upsert)

这一节,将解释如何更新表列,以及如何使用merge命令执行upsert操作。

Delta Lake存储使用parquet文件,而parquet文件是不可变的,不支持更新。Delta Lake提供了merge语句来提供类似于更新的接口,但实际上,这些并不是真正的更新,不会改变底层文件。Delta Lake只是在重写整个Parquet文件。这将使大型数据集上的upsert或update列语句非常慢。

1.merge示例

下面通过一个示例来理解和掌握使用merge命令的方法。

首先准备一个样本events_data.csv,其内容如下:

eventType,websitePage
click,homepage
clck,about page
mouseOver,logo

读取该数据文件,并写出到Delta Lake中,代码如下:

// 数据源文件
val path = "/data/data_lake/delta_lake/event_data.csv"

// delta路径(HDFS)
val outputPath = "/data_lake/delta_lake/events/"

spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)
  .repartition(1)
  .write
  .format("delta")
  .save(outputPath) 

看一下_delta_log/ 0000000000000000000000.json事务日志文件中存储了什么内容。执行以下hdfs shell命 ......

......

抱歉,只有登录会员才可浏览!会员登录


《PySpark原理深入与编程实战》