我很难理解Beam的窗口应该如何与丢弃延迟数据相关。
下面是我的管道:
pipeline
| ReadFromPubSub()
| WindowInto(
FixedWindow(60),
trigger=AfterWatermark(
early=AfterCount(1000),
late=AfterProcessingTime(60),
),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=10*60,
)
| GroupByKey()
| SomeTransform()
| Map(print)
我的理解是,来自Pub/Sub的数据元素被分配了它们发布到订阅时的时间戳。
当我启动管道来消费来自Pub/Sub的数据时,其中也包括旧数据,我希望只有最近10分钟的数据(如allowed_lateness所设置的)输出WindowInto的参数。但是结果是所有订阅的数据都被打印出来了。
我错过了什么还是我理解窗口错误?
使用Python和Beam 2.42.0
在上述管道中,没有后期数据
数据只在与水印相比时延迟。为了计算水印,Beam使用正在读取的消息的时间戳。在您的示例中,是在ReadFromPubsub
转换中生成消息时的时间戳。
关于时间戳属性如何与Pubsub一起使用的更多细节,请参阅SO中的这个问题(对于Python,检查ReadFromPubsub
中的参数timestamp_attribute
)
如果您正在读取的消息中没有时间戳属性,但是它们包含一个带有时间戳的字段(在有效载荷中),您可以使用它来生成时间戳值(解析有效载荷,然后使用发出TimestampedValue
或WithTimestamps
转换的DoFn
)。