使用结构化流式处理对流静态内部联接输出进行聚合



这个问题与Spark 2.4.4有关。

我正在做一个流静态内部连接,结果为:-

val orderDetailsJoined = orderItemsDF.join(ordersDF, Seq("CustomerID"), joinType = "inner")
+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|CustomerID|OrderID|ItemID|ProductID|Quantity|Subtotal|ProductPrice|OrderItemsTimestamp    |OrderDate            |Status         |OrdersTimestamp        |
+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|2         |33865  |84536 |957      |1       |299.98  |299.98      |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|
|2         |33865  |84537 |1073     |1       |199.99  |199.99      |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|
|2         |33865  |84538 |502      |1       |50.0    |50.0        |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE       |2019-11-30 18:29:19.331|

其中"orderItemsDF"是流式传输数据帧,"ordersDF"是静态DF。

现在,我尝试按"客户ID"和"订单ID"对结果进行分组,如下所示:

val aggResult = orderDetailsJoined.withWatermark("OrdersTimestamp", "2 minutes").
groupBy(window($"OrdersTimestamp", "1 minute"), $"CustomerID", $"OrderID").
agg(sum("Subtotal")).
select(col("CustomerID"), col("OrderID"), col("sum(Subtotal)").alias("Total Amount"))

但是当我尝试将结果视为以下结果时,这给了我空白输出:

val res = aggResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime("20 seconds")).option("truncate", "false").start()
res.awaitTermination()
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------+------------+
|CustomerID|OrderID|Total Amount|
+----------+-------+------------+
+----------+-------+------------+

如果我这样做,

res.explain(true)

它说:No physical plan. Waiting for data.

请帮忙!!

tl;博士OrdersTimestamp值似乎没有前进,因此 2 分钟的水印和 1 分钟的groupBy水线无法完成它们的工作。


使用OrdersTimestamp通知 Spark 事件时间。如果在你发布的三个事件中它停留在2019-11-30 18:29:19.331,并且时间没有前进,Spark 只需等到"时间"组和后期事件的水印为2019-11-30 18:29:19.331+ "1 分钟"+"2 分钟"即可将分组结果传递到下游。