我在Spark结构化流foreachBatch()
中使用手动维护一个滑动窗口,由最后200000个条目组成。每个微批处理大约有50行。在这个滑动滑动窗口上,我手动计算我想要的度量,如min, max等。
Spark还提供了滑动窗口函数。但是我有两个问题:
-
滑动窗口更新的时间间隔只能基于时间段进行配置,但似乎不可能强制每个微批进入时进行更新。有没有可能我没看到?
-
更大的问题:似乎我只能做聚合使用分组,如:
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
但是我不想在多个滑动窗口上分组。我需要像现有的foreachBatch()这样的东西,它不仅允许我访问当前批处理,还允许我访问/或当前滑动窗口。有类似的东西吗?
感谢您的宝贵时间!您可以使用flatMapGroupsWithState功能来实现这一点。基本上,你可以在内部状态下存储/保持更新以前的批次(只有你需要的信息),并在下一个批次中使用它
你可以参考下面的链接
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html arbitrary-stateful-operations
https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-demo-arbitrary-stateful-streaming-aggregation-flatMapGroupsWithState.html