我是spark的新手,我有一个用例,希望以追加输出的方式发出聚合。我知道附加模式不适合聚合,只有当我们提供窗口操作(以及水印(时,spark才支持该功能。
现在,我想要spark做的是忘记旧消息(已经在流中处理的消息(,只取在上次输出和当前时间之间到达的消息(意味着当前微批(,并计算其上的聚合并发出结果。
我认为它相当简单,但找不到如何做到这一点的例子
首先,我们不必只在结构化流中使用基于窗口的聚合,我们也可以在event_time列上进行聚合,并在该列上设置水印。以下是结构化流媒体节目指南中的一些要点。
聚合必须具有事件时间列或事件时间列上的窗口。
必须在与聚合中使用的时间戳列相同的列上调用
withWatermark。例如,df.withWatermark("时间","1分钟"(.groupBy("时间2"(.count((在追加输出模式下无效,因为水印是在聚合列的不同列上定义的。
了解更多详细信息,请参阅Spark Structured Streaming编程指南。
其次,
现在,我想让spark忘记旧消息(已经在流中处理的消息(,只取在上次输出和当前时间之间到达的消息(意味着当前微批(,并计算其上的聚合并发出结果。
要进行不需要旧数据的聚合,可以通过将水印持续时间设置为"0"来完成;0分钟";,这意味着流引擎在聚合时不会等待任何旧记录。
例如->假设我们有以下df
+-------------------+--------------+
|end_time |transaction_id|
+-------------------+--------------+
|2020-12-14 15:08:52|d6 |
|2020-12-14 15:08:52|02 |
+-------------------+--------------+
聚合前将水印设置为-
val waterMarkDf = df.withWatermark("end_time","0 minutes")
接着是攻击。需要记住的一点是,结构化流媒体引擎不能保证在聚合时会丢弃所有旧记录。有关此方面的更多详细信息,请阅读Spark结构化流媒体编程指南中的"使用水印聚合的语义保证"一节