以下是WatermarkGenerator
、的定义
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
对于onEvent
方法,为什么在方法签名中有一个冗余的参数long eventTimestamp
,我认为事件时间应该能够从T event
中提取(具体的事件类型应该与事件时间一起携带(,并且它应该等于eventTimestamp,所以我会问,既然我可以从事件中获得它,为什么需要这个冗余参数long eventTimestamp
,这里的设计考虑是什么?
传递给onEvent
方法的时间戳是包裹事件的StreamRecord
信封中的当前时间戳。这是之前分配给该事件的任何时间戳——例如,在Kafka的情况下,这可能是Kafka记录头中的时间戳值。
虽然这通常是冗余信息,但在某些情况下,访问先前分配给上游的时间戳是有用的。
我认为逻辑是让WatermarkGenerator
不对事件时间的来源做出任何假设。
一个典型的情况是,作为开发人员,我们提前几个步骤提供了TimestampAssigner
,以从每个事件中提取事件时间。即使在这种情况下,也可能不希望在WatermarkGenerator
中重复这种逻辑,以避免耦合,特别是如果过程比只读取单个字段更复杂的话。所以这是在这里提供它的第一个动机。
另一种典型的情况是,事件时间由数据源本身提供,独立于每个事件中的任何字段。例如,可以选择使用Kafka连接器就是这样一种方式,让它从Kafka时间戳元数据中获得事件时间,而不依赖于事件负载。