云数据流:一旦触发器不起作用



>我有一个从无限源读取的数据流管道。我的窗口大小是 10 小时,我正在尝试使用 TestStream 测试我的触发器。如果 Window 中同一键的元素计数至少达到 2,我的触发器将发出早期结果。我有以下触发器来实现这一点:

input.apply(Window.into(FixedWindows.of(Duration.standardHours(12)))              .triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(2)))
.apply(Count.perElement())

我们还尝试了:

Repeatedly.forever(AfterPane.elementCountAtLeast(2)).orFinally(AfterWatermark.pastEndOfWindow())

我希望在断言结果时提前解雇,但是我没有得到所有的结果

PAssert.that(pipeline).inWindow(..)..

我做错了什么?此外,重复运行相同的测试会产生不同的结果,这意味着触发器返回不同的值。

触发是不确定的。它会在满足触发条件后的一段时间内为您提供提前点火。然后,它将在再次满足触发条件后的一段时间内再次提前点火。

触发后发出的实际选择由运行器确定。如果您使用的是批处理运行器,则可能会等到所有数据都可用。您期望每个键/窗口有多少输入?您使用的是哪种跑步者?

最新更新