如何处理流中从上次输出到接收器的消息



我是spark的新手,我有一个用例,希望以追加输出的方式发出聚合。我知道附加模式不适合聚合,只有当我们提供窗口操作(以及水印(时,spark才支持该功能。

现在,我想要spark做的是忘记旧消息(已经在流中处理的消息(,只取在上次输出和当前时间之间到达的消息(意味着当前微批(,并计算其上的聚合并发出结果。

我认为它相当简单,但找不到如何做到这一点的例子

首先,我们不必只在结构化流中使用基于窗口的聚合,我们也可以在event_time列上进行聚合,并在该列上设置水印。以下是结构化流媒体节目指南中的一些要点。

聚合必须具有事件时间列或事件时间列上的窗口。

必须在与聚合中使用的时间戳列相同的列上调用

withWatermark。例如,df.withWatermark("时间","1分钟"(.groupBy("时间2"(.count((在追加输出模式下无效,因为水印是在聚合列的不同列上定义的。

了解更多详细信息,请参阅Spark Structured Streaming编程指南。

其次,

现在,我想让spark忘记旧消息(已经在流中处理的消息(,只取在上次输出和当前时间之间到达的消息(意味着当前微批(,并计算其上的聚合并发出结果。

要进行不需要旧数据的聚合,可以通过将水印持续时间设置为"0"来完成;0分钟";,这意味着流引擎在聚合时不会等待任何旧记录。

例如->假设我们有以下df

+-------------------+--------------+
|end_time           |transaction_id|
+-------------------+--------------+
|2020-12-14 15:08:52|d6            |
|2020-12-14 15:08:52|02            |
+-------------------+--------------+

聚合前将水印设置为-

val waterMarkDf = df.withWatermark("end_time","0 minutes")

接着是攻击。需要记住的一点是,结构化流媒体引擎不能保证在聚合时会丢弃所有旧记录。有关此方面的更多详细信息,请阅读Spark结构化流媒体编程指南中的"使用水印聚合的语义保证"一节

最新更新