我有一个用Kafka连接器配置的Flink管道。
我已经使用将水印生成频率设置为2秒
env.getConfig().setAutoWatermarkInterval(2000);
现在,对于流窗口,我的翻滚窗口是60秒,我们在流窗口中进行一些聚合,并根据其中一个数据字段的时间戳进行基于事件时间的处理。
我没有为我的水印策略或流配置allowedTlateness。
final ConnectorConfig topicConfig = config.forTopic("mytopic");
final FlinkKafkaConsumer<MyPojo> myEvents = new FlinkKafkaConsumer<>(
topicConfig.name(),
AvroDeserializationSchema.forSpecific(MyPojo.class),
topicConfig.forConsumer()
);
myEvents.setStartFromLatest();
myEvents.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyPojo>forBoundedOutOfOrderness(
Duration.ofSeconds(30))
.withIdleness(Duration.ofSeconds(120))
.withTimestampAssigner((evt, timestamp) -> evt.event_timestamp_field));
Q.1根据我所读到的内容,我时间0-60的窗口将在90秒后计算,在120秒时计算30-90,依此类推。然而,由于我们正在进行翻滚窗口,即没有重叠,我猜测没有30-90窗口,0-60之后的下一个窗口是在150秒时触发的60-120,我说得对吗?
Q.2如果没有allowedTlateness,所有延迟数据都将被丢弃。例如,时间戳为45的事件在90秒后到达,则被认为是无序的,并且将超出第一个窗口,即0-60。对于窗口60-120,事件时间戳不匹配,因此它将被丢弃,并且不包括在150秒时触发的窗口中,我说得对吗?
Q.3.对于源空闲持续时间,我选择120,表示如果主题的任何Kakfa分区有数据处于非活动状态,则在2分钟后将其标记为空闲,然后发送其他活动分区的水印。我的问题是选择这个数字,即2分钟,以及它是否与窗口持续时间(60秒(或无序(30秒(有关。如果是这样的话,我应该在这里记住什么,以便进行适当的选择,这样我就不会因为空闲分区导致的未推进水印而导致数据延迟?
或者120的等待时间太长,我可能会错过数据,因此我应该将其设置为远小于OutOfOrderness持续时间的值,以确保0数据丢失?
编辑:添加了更多的代码
Q1:是的,这是正确的。
Q2:是的,这也是正确的。
Q3:这里的细节取决于你是否让Kafka源应用WatermarkStrategy,在这种情况下,它将按分区进行水印,或者WatermarkStrategie是否作为一个单独的操作符部署在源操作符之后(通常紧接在源操作符后面(。
在第一种情况下(使用FlinkKafkaConsumer
进行的每分区水印(,您将执行以下操作:
FlinkKafkaConsumer<MyType> kafkaSource = new FlinkKafkaConsumer<>(...);
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy ...);
DataStream<MyType> stream = env.addSource(kafkaSource);
而在源之后单独进行水印处理,看起来是这样的:
DataStream<MyType> events = env.addSource(...);
DataStream<MyType> timestampedEvents = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MyType>forBoundedOutOfOrderness(Duration ...)
.withTimestampAssigner((event, timestamp) -> event.timestamp));
当在每个分区的基础上进行水印时,单个空闲分区将为处理该分区的使用者/源实例保留水印,直到空闲超时开始(在您的示例中为120秒(。相比之下,如果水印是在链接到源的单独运算符中完成的,那么只有当分配给该源实例的所有分区(具有空闲分区的分区(都是空闲的时,水印才会被保留(再次保留120秒(。
但不管这些细节如何,希望不会有数据丢失。在一段时间内,窗口不会被触发(因为水印没有前进(,但事件将继续被处理并分配给相应的窗口。水印恢复后,这些窗口将关闭并交付结果。
数据丢失的情况是,如果分区空闲,因为上游的一些故障导致了中断,最终产生了一系列后期事件。空闲超时到期后,水印将前进,如果源空闲是因为上游发生了故障(而不是因为根本没有事件(,那么最终到达的事件将延迟(除非有界无序延迟足够大以容纳它们(。如果您选择忽略后期事件,则这些事件将丢失。