玛瑙中的水印触发器不会触发



>我有一个 Onyx 段流,它们是带有时间戳的消息(按时间顺序排列)。说,它们看起来像这样:

{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:id 2 :timestamp "2018-09-04 21:32:03" :msg "Lorem ipsum"}
{:id 3 :timestamp "2018-09-05 03:01:52" :msg "Dolor sit amet"}
{:id 4 :timestamp "2018-09-05 09:28:16" :msg "Consetetur sadipscing"}
{:id 5 :timestamp "2018-09-05 12:45:33" :msg "Elitr sed diam"}
{:id 6 :timestamp "2018-09-06 08:14:29" :msg "Nonumy eirmod"}
...

对于数据中的每个时间窗口(一天),我想对其所有段的集合运行计算。 即,在示例中,我想对 id 为 1 和 2 的段(9 月 4 日),接下来对 ids 3、4 和 5(9 月 5 日)等进行操作。

Onyx提供窗口和触发器,它们应该开箱即用。如果我使用:window/type :fixed窗口并聚合:window/range [1 :day]相对于:window/window-key :timestamp,我将聚合每天的所有段。

为了仅在一天的所有部分到达时触发我的计算,Onyx 提供了触发行为:onyx.triggers/watermark。根据文档,它应该会触发

如果线段中的:window/window-key值超过活动窗口范围的上限

但是,触发器不会触发,即使我可以看到后面的段已经进来并且几个窗口应该已满。作为健全性检查,我尝试了一个简单的:onyx.triggers/segment触发器,它按预期工作。


我尝试创建最小示例失败:

我修改了固定的窗口玩具作业以测试水印触发,它在那里工作

但是,我发现在这个玩具作业中,水印触发器触发的原因可能是:

它是否关闭了输入通道?也许工作刚刚完成,也可以触发水印。


与水印触发交互的另一个方面是对等方对任务的分布式工作。

Onyx 存储库中对问题 #839(:trigger/emit不适用于:onyx.triggers/watermark)的评论将我指向问题 #840(水印不适用于具有> 1 分区的 Kafka 主题),在那里我找到了这个线索(强调我的):

问题是您的所有数据最终都位于一个分区上,并且水印始终在所有输入对等体上采用最小水印(如果使用本机 kafka 水印,则为给定对等体的最小水印)。

当您使用少量数据调用 g/send 并自动分配分区时,您的所有数据最终都会在一个分区上,这意味着另一个分区的对等方继续发出0 的水印


我发现:

无法

将其与依赖于输入源的当前水印触发器一起使用。您可以尝试拉取以前的水印实现 [...]

但是,在我的任务图中,我想在窗口中聚合的仅在某个中间任务中创建,它们并非源自输入任务本身。输入段仅提供有关如何创建/检索该中间任务的区段内容的信息。

同样,这种结构在上面提到的玩具作业中工作正常。原因是输入通道在某个点关闭,这会结束作业,进而触发水印。所以我的玩具例子实际上不是一个好的模型,因为它不是一个开放式的流。

如果作业确实从实际的输入源获取有问题的段,但没有时间戳,Onyx 似乎提供了指定assign-watermark-fn的空间,这是输入任务的可选属性。该函数在新段的每次到达时设置水印。就我而言,这无济于事,因为段不是来自输入任务。

我现在自己想出了一个解决方法。该文档基本上提供了如何做到这一点的线索:

这是标点符号触发器的

快捷方式函数,当任何数据段具有高于另一个范围的基于时间的窗口键时,将触发标点符号触发器,从而有效地声明不会再有早期窗口的数据到达。

因此,我更改了发出段的任务,以便对于每个段,也会发出另一个类似段的"哨兵":

[{:id 1 :timestamp "2018-09-04 13:15:42" :msg "Hello, World!"}
{:timestamp "2018-09-03 13:15:42" :over :out}]

请注意,:timestamp早于窗口范围(此处为 1 天)。因此,它将被发送到上一个窗口。由于我的数据是按时间顺序输入的,因此:punctuation触发器可以从"哨兵"段(带有关键字 :over)的存在中看出窗口可以关闭。不要忘记驱逐(即:trigger/post-evictor [:all])并从最后一个窗口中扔掉"哨兵"段。在任务结构图中添加:onyx/max-peers 1可确保最终始终到达哨兵,尤其是在使用分组时。

请注意,此解决方法有两个假设:

  1. 数据按时间顺序排列
  2. 没有
  3. 没有段的窗口

最新更新