如何使用 Apache Flink 会话化流



我想会话这个流:1,1,1,2,2,2,2,3,3,3,3,3,3,3,0,3,3,3,3,5,...到这些会话:

1,1,1
2,2,2,2,2
3,3,3,3,3,3,3
0
3,3,3
5

我编写了 CustomTrigger 来检测流元素何时从 1 更改为 2(2 到 3,3 变为 0 等(,然后触发触发器。但这不是解决方案,因为当我处理 2 的第一个元素并触发触发器时,窗口将是 [1,1,1,2],但我需要在 1 的最后一个元素上触发触发器。

这是我的自定义触发器类中 onElement 函数的 pesudo:

override def onElement(element: Session, timestamp: Long, window: W, ctx: TriggerContext): TriggerResult = {
    if (prevState == element.value) {
      prevState = element.value
      TriggerResult.CONTINUE
    } else {
      prevState = element.value
      TriggerResult.FIRE
    }
}

如何解决这个问题?

我认为带有ListStateFlatMapFunction是实现此用例的最简单方法。

当新元素到达时(即调用flatMap()方法(,检查值是否更改。如果值未更改,则将元素追加到状态。如果值已更改,则将当前列表状态作为会话发出,清除列表,然后将新元素作为第一个元素插入列表状态。

但是,您应该记住,这假设元素的顺序被保留。Flink 确保在分区内,即只要元素不被洗牌并且所有运算符都以相同的并行性运行。

相关内容

  • 没有找到相关文章

最新更新