时间窗口不考虑事件时间



我正在尝试flink,以用于从CSV文件加载的(排序)时间戳的时间戳事件的基本聚合。

我告诉Flink使用活动时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

,然后我在keyedstream上使用一个时间窗口

val distances = signals
  .assignAscendingTimestamps(_.ts)
  .map(s => (s.mmsi, s.ts, getPortDistance(s)))
  .keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(1)))
  .sum(2).print()

问题在于,将窗口更改为10分钟,实际上会在此时间过去了!

我的理解是,通过明确告诉Flink作为事件时间使用时间戳字段,该操作将不取决于机器上的实时时间。我在这里错过了什么吗?

首先,您必须了解水印以及如何生成水印。

什么是水印?

一般而言,水印是一份声明,即到溪流中的那一点,所有时间戳都应该到达。一旦水印到达操作员,操作员就可以将其内部事件时间时钟提高到水印的价值。有关更多详细信息,请查看官方文件。

如何生成水印?

因为您调用gissionScendingTimestamps功能,这意味着您的水印是(最新收到的元素的时间戳-1)。因此,您将获得上升的水印,无法重新撤销订购元素。

如何解决这个问题?

定义您自己的水印时间戳Assginer。您可以查看" cossiscendingTimestamps"的详细实现,然后尝试编写自己的。

相关内容

  • 没有找到相关文章

最新更新