这个问题与Spark 2.4.4有关。
我正在做一个流静态内部连接,结果为:-
val orderDetailsJoined = orderItemsDF.join(ordersDF, Seq("CustomerID"), joinType = "inner")
+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|CustomerID|OrderID|ItemID|ProductID|Quantity|Subtotal|ProductPrice|OrderItemsTimestamp |OrderDate |Status |OrdersTimestamp |
+----------+-------+------+---------+--------+--------+------------+-----------------------+---------------------+---------------+-----------------------+
|2 |33865 |84536 |957 |1 |299.98 |299.98 |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE |2019-11-30 18:29:19.331|
|2 |33865 |84537 |1073 |1 |199.99 |199.99 |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE |2019-11-30 18:29:19.331|
|2 |33865 |84538 |502 |1 |50.0 |50.0 |2019-11-30 18:29:17.893|2014-02-18 00:00:00.0|COMPLETE |2019-11-30 18:29:19.331|
其中"orderItemsDF"是流式传输数据帧,"ordersDF"是静态DF。
现在,我尝试按"客户ID"和"订单ID"对结果进行分组,如下所示:
val aggResult = orderDetailsJoined.withWatermark("OrdersTimestamp", "2 minutes").
groupBy(window($"OrdersTimestamp", "1 minute"), $"CustomerID", $"OrderID").
agg(sum("Subtotal")).
select(col("CustomerID"), col("OrderID"), col("sum(Subtotal)").alias("Total Amount"))
但是当我尝试将结果视为以下结果时,这给了我空白输出:
val res = aggResult.writeStream.outputMode("append").format("console").trigger(Trigger.ProcessingTime("20 seconds")).option("truncate", "false").start()
res.awaitTermination()
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------+------------+
|CustomerID|OrderID|Total Amount|
+----------+-------+------------+
+----------+-------+------------+
如果我这样做,
res.explain(true)
它说:No physical plan. Waiting for data.
请帮忙!!
tl;博士OrdersTimestamp
值似乎没有前进,因此 2 分钟的水印和 1 分钟的groupBy
水线无法完成它们的工作。
使用OrdersTimestamp
通知 Spark 事件时间。如果在你发布的三个事件中它停留在2019-11-30 18:29:19.331
,并且时间没有前进,Spark 只需等到"时间"组和后期事件的水印为2019-11-30 18:29:19.331
+ "1 分钟"+"2 分钟"即可将分组结果传递到下游。