如何在 Akka 流中对事件进行会话/分组



要求是我想编写一个 Akka 流应用程序,该应用程序侦听来自 Kafka 的连续事件,然后根据嵌入在每个事件中的某个 id 值在时间范围内对事件数据进行会话化。

例如,假设我的时间范围窗口是两分钟,在前两分钟内,我得到以下四个事件:

输入:

{"message-domain":"1234","id":1,"aaa":"bbb"}
{"message-domain":"1234","id":2,"aaa":"bbb"}
{"message-domain":"5678","id":4,"aaa":"bbb"}
{"message-domain":"1234","id":3,"aaa":"bbb"}

然后在输出中,对这些事件进行分组/会话化后,我将只有两个基于其消息域值的事件。

输出:

{"message-domain":"1234",messsages:[{"id":1,"aaa":"bbb"},{"id":2,"aaa":"bbb"},{"id":4,"aaa":"bbb"}]}
{"message-domain":"5678",messsages:[{"id":3,"aaa":"bbb"}]}

我希望这能实时发生。关于如何实现这一目标的任何建议?

要在时间范围内对事件进行分组,您可以使用Flow.groupedWithin

val maxCount : Int = Int.MaxValue
val timeWindow = FiniteDuration(2L, TimeUnit.MINUTES)
val timeWindowFlow : Flow[String, Seq[String]] =
  Flow[String] groupedWithin (maxCount, timeWindow)

相关内容

  • 没有找到相关文章

最新更新