我正在使用风暴实现滑动窗口:
从这里
这是我的拓扑:
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbitSpout", new RabbitMQSpout());
builder.setBolt("filterBolt", new FilteringBolt()).shuffleGrouping("rabbitSpout");
builder.setBolt("HourStatisticsBolt", new SlidingWindowStatisticsBolt()
.withWindow(new BaseWindowedBolt.Duration(60, TimeUnit.MINUTES),
new BaseWindowedBolt.Duration(10, TimeUnit.SECONDS))
.withTimestampField("timestamp")).shuffleGrouping("filterBolt");
在我的SlidingWindowStatisticsBolt的执行方法中,我想获取窗口开始或结束的时间戳。在我的螺栓中,如何获得窗口长度和滑动持续时间?
由于您使用的是事件时间(withTimestampField
(,因此窗口是根据周期性水印计算的。现在,窗口开始/结束时间不会显示在元组中。
在最新的 storm 主分支中,TupleWindow 有一个 getTimestamp
方法,该方法返回窗口结束时间戳,并适用于基于处理和事件时间的窗口。这将在 storm 的未来版本(2.0 版本(中提供。如果您希望将其向后移植并在下一个 1.x 版本中提供,您可以在此处提交 JIRA