// 定义DataFrame创建方法
def createDF(rows: Seq[Row], schema: StructType): DataFrame = {
spark.createDataFrame(
sc.parallelize(rows),
schema
List(
StructField("order_no", StringType, true),
StructField("customer_id", StringType, true),
StructField("quantity", IntegerType, true),
StructField("cost", DoubleType, true),
StructField("order_date", DateType, true),
StructField("last_updated_date", DateType, true)
// 创建 orders DataFrame
val orders = Seq(
Row("001", "u1", 1, 15.00, Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01")),
Row("002", "u2", 1, 30.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01"))
val ordersDF = createDF(orders, schema)
ordersDF.printSchema()
ordersDF.show()
|-- order_no: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- cost: double (nullable = true)
|-- order_date: date (nullable = true)
|-- last_updated_date: date (nullable = true)
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
| 001| u1| 1|15.0|2020-03-01| 2020-03-01|
| 002| u2| 1|30.0|2020-04-01| 2020-04-01|
+--------+-----------+--------+----+----------+-----------------+
// 创建order_updates DataFrame
val orderUpdates = Seq(
Row("002", "u2", 1, 20.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02")),
Row("003", "u3", 3, 50.00, Date.valueOf("2020-04-02"), Date.valueOf("2020-04-02"))
val orderUpdatesDF = createDF(orderUpdates, schema)
orderUpdatesDF.printSchema()
orderUpdatesDF.show()
|-- order_no: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- quantity: integer (nullable = true)
|-- cost: double (nullable = true)
|-- order_date: date (nullable = true)
|-- last_updated_date: date (nullable = true)
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
| 002| u2| 1|20.0|2020-04-01| 2020-04-02|
| 003| u3| 3|50.0|2020-04-02| 2020-04-02|
+--------+-----------+--------+----+----------+-----------------+
| unioned.order_no = grouped.order_no AND
| unioned.customer_id = grouped.customer_id AND
| unioned.last_updated_date = grouped.max_date
""".stripMargin
orderMergedDF.show()
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
| 001| u1| 1|15.0|2020-03-01| 2020-03-01|
| 003| u3| 3|50.0|2020-04-02| 2020-04-02|
| 002| u2| 1|20.0|2020-04-01| 2020-04-02|
+--------+-----------+--------+----+----------+-----------------+
// (2) 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
.groupBy($"order_no", $"customer_id")
.agg(max("last_updated_date").as("last_updated_date"))
//grouped.show()
// (3) 使用给定列与另一个DataFrame进行内连接
val merged = grouped.join(unioned, Seq("order_no", "customer_id", "last_updated_date"))
merged.show()
+--------+-----------+-----------------+--------+----+----------+
|order_no|customer_id|last_updated_date|quantity|cost|order_date|
+--------+-----------+-----------------+--------+----+----------+
| 001| u1| 2020-03-01| 1|15.0|2020-03-01|
| 003| u3| 2020-04-02| 3|50.0|2020-04-02|
| 002| u2| 2020-04-02| 1|20.0|2020-04-01|
+--------+-----------+-----------------+--------+----+----------+
val keys = Seq("order_no", "customer_id") // 指定key列
val timestampCol = "last_updated_date"
// 转换为 Seq[org.apache.spark.sql.Column]
val keysColumns = keys.map(ordersDF(_))
// 合并两个数据集
val unioned = ordersDF.union(orderUpdatesDF)
// 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
.groupBy(keysColumns: _*)
.agg(max(timestampCol).as(timestampCol))
// 通过join连接,取upsert后的结果
val merged = grouped.join(unioned, keys :+ timestampCol)
merged.show()
// (2) 为每一行分配一个行号(_row_number)。可以使用窗口函数对记录进行分组和分区。
import org.apache.spark.sql.expressions.Window
// 定义窗口规范
val w = Window.partitionBy("order_no").orderBy($"last_updated_date".desc)
val unioned2 = unioned.withColumn("_row_number", row_number().over(w))
//unioned2.show()
// (3) 筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。
val merged = unioned2.where("_row_number = 1").drop("_row_number")
merged.orderBy("order_no").show()
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
| 001| u1| 1|15.0|2020-03-01| 2020-03-01|
| 002| u2| 1|20.0|2020-04-01| 2020-04-02|
| 003| u3| 3|50.0|2020-04-02| 2020-04-02|
+--------+-----------+--------+----+----------+-----------------+