在Spark中实现增量合并(upsert/merge实现)
2022-07-21 20:34:48.0
通常会将大量数据抽取到Hadoop分布式文件系统(HDFS)中进行分析。通常情况下,我们需要用新的变化定期更新这些数据。很长一段时间以来,实现这一目标的最常见方法是使用Apache Hive增量地将新的或更新的记录合并到现有数据集中。增量合并也可以使用Apache Spark执行。在这篇博客中,我将探索如何使用Spark SQL和Spark DataFrame增量更新数据,并演示三种不同的实现方法。
1. 什么是增量合并?
考虑下面的订单表orders:
order_no | customer_id | quantity | cost | order_date | last_updated_date |
---|---|---|---|---|---|
001 | u1 | 1 | ¥15.00 | 2022-03-01 | 2022-03-01 |
002 | u2 | 1 | ¥30.00 | 2022-04-01 | 2022-04-01 |
现在,假设我们在order_updates表中收到了对订单号“002”的成本更新,如下所示:
order_no | customer_id | quantity | cost | order_date | last_updated_date |
---|---|---|---|---|---|
002 | u2 | 1 | ¥20.00 | 2022-04-01 | 2022-04-02 |
003 | u3 | 3 | ¥50.00 | 2022-04-02 | 2022-04-02 |
注意到,其中对002号订单的成本(cost)和最后更新日期(last_updated_date)这两个字段值作了更新。而003号订单是新增加的订单。现在要求将执行增量合并,将order_updates表中的数据合并更新到orders表中,以生成一张更新的表,更新以后的内容如下:
order_no | customer_id | quantity | cost | order_date | last_updated_date |
---|---|---|---|---|---|
001 | u1 | 1 | ¥15.00 | 2022-03-01 | 2022-03-01 |
002 | u2 | 1 | ¥20.00 | 2022-04-01 | 2022-04-02 |
003 | u3 | 3 | ¥50.00 | 2022-04-02 | 2022-04-02 |
其中001号订单没有改变,002号的值被修改为最新的值,003号订单作为新增的订单添加到表中。这种将新表中的数据与旧表(也叫目标表)中的数据进行合并,如果有重复的记录,则用新记录值修改旧的记录值(即update操作),如果有增加的记录,则添加进去(即insert操作),这种合并就是“增量更新”,也称为upsert或merge合并。
遗憾的是,在Apache Spark 2.x中不支持合并操作功能。但是,我们可以采用变通的方法实现同样的upsert功能。接下来就给大家讲解和演示三种实现合并操作的方法。
准备演示数据
首先,导入相关的包,并定义一个创建DataFrame的辅助方法,代码如下:
import java.sql.Date import org.apache.spark.sql.types._ import org.apache.spark.sql._ // 定义DataFrame创建方法 def createDF(rows: Seq[Row], schema: StructType): DataFrame = { spark.createDataFrame( sc.parallelize(rows), schema ) }
创建订单表orders,代码如下:
val schema = StructType( List( StructField("order_no", StringType, true), StructField("customer_id", StringType, true), StructField("quantity", IntegerType, true), StructField("cost", DoubleType, true), StructField("order_date", DateType, true), StructField("last_updated_date", DateType, true) ) ) // 创建 orders DataFrame val orders = Seq( Row("001", "u1", 1, 15.00, Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01")), Row("002", "u2", 1, 30.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01")) ) val ordersDF = createDF(orders, schema) ordersDF.printSchema() ordersDF.show()
执行以上代码,输出内容如下:
root |-- order_no: string (nullable = true) |-- customer_id: string (nullable = true) |-- quantity: integer (nullable = true) |-- cost: double (nullable = true) |-- order_date: date (nullable = true) |-- last_updated_date: date (nullable = true) +--------+-----------+--------+----+----------+-----------------+ |order_no|customer_id|quantity|cost|order_date|last_updated_date| +--------+-----------+--------+----+----------+-----------------+ | 001| u1| 1|15.0|2020-03-01| 2020-03-01| | 002| u2| 1|30.0|2020-04-01| 2020-04-01| +--------+-----------+--------+----+----------+-----------------+
然后创建更新表创建order_updates DataFrame,代码如下:
// 创建order_updates DataFrame val orderUpdates = Seq( Row("002", "u2", 1, 20.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02")), Row("003", "u3", 3, 50.00, Date.valueOf("2020-04-02"), Date.valueOf("2020-04-02")) ) val orderUpdatesDF = createDF(orderUpdates, schema) orderUpdatesDF.printSchema() orderUpdatesDF.show()
执行以上代码,输出内容如下:
root |-- order_no: string (nullable = true) |-- customer_id: string (nullable = true) |-- quantity: integer (nullable = true) |-- cost: double (nullable = true) |-- order_date: date (nullable = true) |-- last_updated_date: date (nullable = true) +--------+-----------+--------+----+----------+-----------------+ |order_no|customer_id|quantity|cost|order_date|last_updated_date| +--------+-----------+--------+----+----------+-----------------+ | 002| u2| 1|20.0|2020-04-01| 2020-04-02| | 003| u3| 3|50.0|2020-04-02| 2020-04-02| +--------+-----------+--------+----+----------+-----------------+
方法一:使用SQL查询实现增量合并
第一种方法是使用SQL语句来实现增量合并。为此,首先将ordersDF和orderUpdateDF注册到临时视图中,代码如下:
// 创建临时视图产 ordersDF.createOrReplaceTempView("orders") orderUpdatesDF.createOrReplaceTempView("order_updates")
然后执行实现增量合并的SQL语句,代码如下:
// 执行SQL语句 val orderMergedDF = spark.sql( """ |SELECT unioned.* |FROM ( | SELECT * FROM orders x | UNION ALL | SELECT * FROM order_updates y |) unioned |JOIN |( | SELECT | order_no, customer_id, | max(last_updated_date) as max_date | FROM ( | SELECT * FROM orders | UNION ALL | SELECT * FROM order_updates | ) t | GROUP BY | order_no, customer_id |) grouped |ON | unioned.order_no = grouped.order_no AND | unioned.customer_id = grouped.customer_id AND | unioned.last_updated_date = grouped.max_date """.stripMargin ) orderMergedDF.show()
执行以上代码,输出内容如下:
+--------+-----------+--------+----+----------+-----------------+ |order_no|customer_id|quantity|cost|order_date|last_updated_date| +--------+-----------+--------+----+----------+-----------------+ | 001| u1| 1|15.0|2020-03-01| 2020-03-01| | 003| u3| 3|50.0|2020-04-02| 2020-04-02| | 002| u2| 1|20.0|2020-04-01| 2020-04-02| +--------+-----------+--------+----+----------+-----------------+
从输出结果可以看出,已经实现了upsert功能。
方法二:使用DataFrame API实现增量合并
第二种方法是使用DataFrame API,根据方法一的思想,实现增量合并。代码如下:
// (1) 合并两个数据集 val unioned = ordersDF.union(orderUpdatesDF) //unioned.show() // (2) 执行分组,同一组的(需要update的数据)取最新日期 val grouped = unioned .groupBy($"order_no", $"customer_id") .agg(max("last_updated_date").as("last_updated_date")) //grouped.show() // (3) 使用给定列与另一个DataFrame进行内连接 val merged = grouped.join(unioned, Seq("order_no", "customer_id", "last_updated_date")) merged.show()
执行以上代码,输出结果如下:
+--------+-----------+-----------------+--------+----+----------+ |order_no|customer_id|last_updated_date|quantity|cost|order_date| +--------+-----------+-----------------+--------+----+----------+ | 001| u1| 2020-03-01| 1|15.0|2020-03-01| | 003| u3| 2020-04-02| 3|50.0|2020-04-02| | 002| u2| 2020-04-02| 1|20.0|2020-04-01| +--------+-----------+-----------------+--------+----+----------+
可以看到,与方法一相似,同样实现了upsert功能。
上述方法还可以重构为更加通用的形式,代码如下:
val keys = Seq("order_no", "customer_id") // 指定key列 val timestampCol = "last_updated_date" // 转换为 Seq[org.apache.spark.sql.Column] val keysColumns = keys.map(ordersDF(_)) // 合并两个数据集 val unioned = ordersDF.union(orderUpdatesDF) // 执行分组,同一组的(需要update的数据)取最新日期 val grouped = unioned .groupBy(keysColumns: _*) .agg(max(timestampCol).as(timestampCol)) // 通过join连接,取upsert后的结果 val merged = grouped.join(unioned, keys :+ timestampCol) merged.show()
方法三:使用DataFrame API和窗口函数实现
还可以通过窗口函数Window和union函数来模拟,步骤如下:
- (1) 首先将旧表(目标表)和新表合并,使用unionAll()函数;
- (2) 然后,使用窗口函数对记录进行分组,并基于分组为每一行分配一个行号(例如,_row_number);
- (3) 最后,筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。
实现代码如下:
// (1) 合并两个数据集 val unioned = ordersDF.union(orderUpdatesDF) //unioned.show() // (2) 为每一行分配一个行号(_row_number)。可以使用窗口函数对记录进行分组和分区。 import org.apache.spark.sql.expressions.Window // 定义窗口规范 val w = Window.partitionBy("order_no").orderBy($"last_updated_date".desc) val unioned2 = unioned.withColumn("_row_number", row_number().over(w)) //unioned2.show() // (3) 筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。 val merged = unioned2.where("_row_number = 1").drop("_row_number") merged.orderBy("order_no").show()
执行以上代码,输出内容如下:
+--------+-----------+--------+----+----------+-----------------+ |order_no|customer_id|quantity|cost|order_date|last_updated_date| +--------+-----------+--------+----+----------+-----------------+ | 001| u1| 1|15.0|2020-03-01| 2020-03-01| | 002| u2| 1|20.0|2020-04-01| 2020-04-02| | 003| u3| 3|50.0|2020-04-02| 2020-04-02| +--------+-----------+--------+----+----------+-----------------+
可以看到,与前两种方法一样,同样实现了upsert功能。