// 定义DataFrame创建方法 def createDF(rows: Seq[Row], schema: StructType): DataFrame = { spark.createDataFrame( sc.parallelize(rows), schema List( StructField("order_no", StringType, true), StructField("customer_id", StringType, true), StructField("quantity", IntegerType, true), StructField("cost", DoubleType, true), StructField("order_date", DateType, true), StructField("last_updated_date", DateType, true) // 创建 orders DataFrame val orders = Seq( Row("001", "u1", 1, 15.00, Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01")), Row("002", "u2", 1, 30.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01")) val ordersDF = createDF(orders, schema) ordersDF.printSchema() ordersDF.show() |-- order_no: string (nullable = true) |-- customer_id: string (nullable = true) |-- quantity: integer (nullable = true) |-- cost: double (nullable = true) |-- order_date: date (nullable = true) |-- last_updated_date: date (nullable = true) +--------+-----------+--------+----+----------+-----------------+ |order_no|customer_id|quantity|cost|order_date|last_updated_date| +--------+-----------+--------+----+----------+-----------------+ | 001| u1| 1|15.0|2020-03-01| 2020-03-01| | 002| u2| 1|30.0|2020-04-01| 2020-04-01| +--------+-----------+--------+----+----------+-----------------+
// 创建order_updates DataFrame
val orderUpdates = Seq(
  Row("002", "u2", 1, 20.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02")),
  Row("003", "u3", 3, 50.00, Date.valueOf("2020-04-02"), Date.valueOf("2020-04-02"))
val orderUpdatesDF = createDF(orderUpdates, schema)
orderUpdatesDF.printSchema()
orderUpdatesDF.show() 
 |-- order_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- last_updated_date: date (nullable = true)
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+
  |  unioned.order_no = grouped.order_no AND
  |  unioned.customer_id = grouped.customer_id AND
  |  unioned.last_updated_date = grouped.max_date
  """.stripMargin
orderMergedDF.show() 
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     001|         u1|       1|15.0|2020-03-01|       2020-03-01|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+
// (2) 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
  .groupBy($"order_no", $"customer_id")
  .agg(max("last_updated_date").as("last_updated_date"))
//grouped.show()
// (3) 使用给定列与另一个DataFrame进行内连接
val merged = grouped.join(unioned, Seq("order_no", "customer_id", "last_updated_date"))
merged.show() 
+--------+-----------+-----------------+--------+----+----------+
|order_no|customer_id|last_updated_date|quantity|cost|order_date|
+--------+-----------+-----------------+--------+----+----------+
|     001|         u1|       2020-03-01|       1|15.0|2020-03-01|
|     003|         u3|       2020-04-02|       3|50.0|2020-04-02|
|     002|         u2|       2020-04-02|       1|20.0|2020-04-01|
+--------+-----------+-----------------+--------+----+----------+
val keys = Seq("order_no", "customer_id")  // 指定key列
val timestampCol = "last_updated_date"
// 转换为 Seq[org.apache.spark.sql.Column]
val keysColumns = keys.map(ordersDF(_))
// 合并两个数据集
val unioned = ordersDF.union(orderUpdatesDF)
// 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
  .groupBy(keysColumns: _*)
  .agg(max(timestampCol).as(timestampCol))
// 通过join连接,取upsert后的结果
val merged = grouped.join(unioned, keys :+ timestampCol)
merged.show()
// (2) 为每一行分配一个行号(_row_number)。可以使用窗口函数对记录进行分组和分区。 
import org.apache.spark.sql.expressions.Window
// 定义窗口规范
val w = Window.partitionBy("order_no").orderBy($"last_updated_date".desc)
val unioned2 = unioned.withColumn("_row_number", row_number().over(w))
//unioned2.show()
// (3) 筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。
val merged = unioned2.where("_row_number = 1").drop("_row_number") 
merged.orderBy("order_no").show()
+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     001|         u1|       1|15.0|2020-03-01|       2020-03-01|
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+