风暴:几个带有不同尺寸的滑动窗口的最小/最大聚合

  • 本文关键字:窗口 几个 风暴 apache-storm
  • 更新时间 :
  • 英文 :


我想知道在Apache Storm中解决以下问题是什么。

我有一个单个喷口,该喷口会生成具有附加时间戳的整数值。目标是在此流中使用三个滑动窗口执行最小/最大聚合:

  • 最后一个小时
  • 最后一天,即最近24小时

最后一个小时很容易:

topology.setBolt("1h", ...)
    .shuffleGrouping("spout")
    .withWindow(Duration.hours(1), Duration.seconds(10))
    .withTimestampField("timestamp"));

但是,在较长的时间内,我担心窗户的队列大小。当我与最后一个小时的聚合一样,直接从喷口食用元素时,每个元组都会在队列中。

一种可能性是从预处理的" 1H"螺栓中消耗元素。但是,由于我使用了明确的时间戳,因此忽略了从" 1H"螺栓到达的较晚元组。1小时的滞后不是一个选择,因为这延迟了窗口的评估。有没有一种方法可以"允许"较晚的元素而不会影响结果的及时性?

当然,我也可以每小时存储一个总计,然后在过去24小时内计算最小值,包括" 1H"流的最新值。但是我很好奇是否有一种使用风暴手段正确执行此操作的方法。

更新1

感谢Arunmahadevan的答案,我更改了1H分钟螺栓,以发射最小元组,并在相应的1H窗口中使用所有元组的最大时间戳。这样一来,由于较晚到达而消耗的螺栓不会丢弃元组。我还引入了一个新的original-timestamp,以保留最小元组的原始时间戳。

更新2

我终于仅通过发射1H分钟螺栓中的状态变化找到了一种更好的方法。只要没有收到新的元素,就不会在消耗螺栓中延长风暴的时间,因此可以防止到达较晚的问题。另外,我可以保留原始的时间戳而不将其复制到单独的字段中。

我认为将最小的最小发射从" 1H"发射到" 24H"螺栓应该有效并保持" 24H"队列大小。

如果您配置了滞后,则仅在滞后之后调用螺栓的执行(即事件时间跨越滑动间隔 滞后(。

说,如果" 1H"螺栓配置为滞后1分钟,则仅在事件时间越过02:01之后,将在01:00-02:00之间为元组调用执行。(即,螺栓已经看到了带有时间戳> = 02:01的事件(。但是,执行将仅在01:00至02:00之间接收元组。

现在,如果您计算最后一小时的最小值,并将结果发射到一个" 24h"螺栓中,该螺栓的滑动间隔为1小时,滞后= 0,则一旦即将到来的事件的时间戳将触发下一个小时。如果您以02:00为02:00的时间戳发射01:00-02:00,则" 24H"窗口将触发(对于前一天之间的事件02:00至02:00(,一旦接收到Min Event由于事件时间越过了下一个小时,因此配置的滞后为0。

最新更新