为什么 Flink 会在数据流 join + 全局窗口上发出重复记录?



我正在学习/试验 Flink,我观察到 DataStream 联接的一些意外行为,并想了解发生了什么......

假设我有两个流,每个流有 10 条记录,我想在id字段中加入它们。假设一个流中的每个记录在另一个流中都有一个匹配的记录,并且每个流中的 ID 都是唯一的。假设我必须使用全局窗口(要求(。

使用 DataStream API(我在 Scala 中的简化代码(加入:

val stream1 = ... // from a Kafka topic on my local machine (I tried with and without .keyBy)
val stream2 = ... 
stream1
.join(stream2)
.where(_.id).equalTo(_.id)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.apply {
(row1, row2) => // ... 
}
.print()

结果:

  • 一切都按预期打印,第一个流中的每条记录都与第二个流中的记录连接。

然而:

  • 如果我将其中一个记录(例如,具有更新的字段(从其中一个流重新发送到该流,则会发出😞两个重复的联接事件
  • 如果我重复该操作(有或没有更新的字段(,我将得到 3 个发出的事件,然后是 4、5 等...... 😞

Flink 社区中的某个人可以解释为什么会发生这种情况吗?我本来预计每次只发出 1 个事件。是否有可能通过全局窗口实现这一目标?

相比之下,Flink Table API 在相同的场景中表现得符合预期,但对于我的项目,我对 DataStream API 更感兴趣。

表 API 的示例,按预期工作:

tableEnv
.sqlQuery(
"""
|SELECT *
|  FROM stream1
|       JOIN stream2
|       ON stream1.id = stream2.id
""".stripMargin)
.toRetractStream[Row]
.filter(_._1) // just keep the inserts
.map(...)
.print() // works as expected, after re-sending updated records

谢谢

尼古拉斯

问题是记录永远不会从全局窗口中删除。因此,只要有新记录到达,但旧记录仍然存在,就可以在全局窗口上触发联接操作。

因此,要使其在您的案例中运行,您需要实现自定义 eator。我在一个最小的工作示例中扩展了您的示例并添加了 evictor,我将在代码片段之后解释。

val data1 = List(
(1L, "myId-1"),
(2L, "myId-2"),
(5L, "myId-1"),
(9L, "myId-1"))
val data2 = List(
(3L, "myId-1", "myValue-A"))
val stream1 = env.fromCollection(data1)
val stream2 = env.fromCollection(data2)
stream1.join(stream2)
.where(_._2).equalTo(_._2)
.window(GlobalWindows.create()) // assume this is a requirement
.trigger(CountTrigger.of(1))
.evictor(new Evictor[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)], GlobalWindow](){
override def evictBefore(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {}
override def evictAfter(elements: lang.Iterable[TimestampedValue[CoGroupedStreams.TaggedUnion[(Long, String), (Long, String, String)]]], size: Int, window: GlobalWindow, evictorContext: Evictor.EvictorContext): Unit = {
import scala.collection.JavaConverters._
val lastInputTwoIndex = elements.asScala.zipWithIndex.filter(e => e._1.getValue.isTwo).lastOption.map(_._2).getOrElse(-1)
if (lastInputTwoIndex == -1) {
println("Waiting for the lookup value before evicting")
return
}
val iterator = elements.iterator()
for (index <- 0 until size) {
val cur = iterator.next()
if (index != lastInputTwoIndex) {
println(s"evicting ${cur.getValue.getOne}/${cur.getValue.getTwo}")
iterator.remove()
}
}
}
})
.apply((r, l) => (r, l))
.print()

在应用窗口函数(在本例中为 join(后,将应用除卡器。如果您第二个输入中有多个条目,则不完全清楚您的用例应该如何工作,但就目前而言,ewriter 仅适用于单个条目。

每当有新元素进入窗口时,窗口函数就会立即触发(count = 1(。然后,使用具有相同键的所有元素评估连接。之后,为了避免重复输出,我们从当前窗口中的第一个输入中删除所有条目。由于第二个输入可能在第一个输入之后到达,因此当第二个输入为空时,不会执行逐出。请注意,我的 scala 很生锈;您将能够以更好的方式编写它。运行的输出为:

Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
Waiting for the lookup value before evicting
4> ((1,myId-1),(3,myId-1,myValue-A))
4> ((5,myId-1),(3,myId-1,myValue-A))
4> ((9,myId-1),(3,myId-1,myValue-A))
evicting (1,myId-1)/null
evicting (5,myId-1)/null
evicting (9,myId-1)/null

最后一点:如果表 API 已经提供了一种简洁的方式来执行您想要的操作,我会坚持使用它,然后在需要时将其转换为 DataStream。

相关内容

  • 没有找到相关文章

最新更新