消息批处理Apache BEAM管道会立即触发,而不是在固定窗口之后触发



我有一个Apache BEAM管道,我想从Google PubSub主题中读取它,应用重复数据消除,并在15分钟固定窗口(结束时(将消息发送到另一个PubSub主题。然而,为了让它与重复数据消除一起工作,问题是消息似乎会立即发送到主题,而不是等待15分钟结束。

即使在应用了Window.triggering(AfterWatermark.pastEndOfWindow())之后,它似乎也不起作用(即消息会立即发出(。(参考:https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html)。

正在寻求有关我的代码出现问题的帮助?以下完整代码:

此外,假设重复数据消除以固定窗口为界限是否正确,或者我需要单独设置重复数据消除的时域(参考:https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html似乎说它将默认为时域,这将是定义的固定窗口(

pipeline
.apply("Read from source topic", PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC))
.apply("De-duplication ", 
Deduplicate.<String>values()
)
.apply(windowDurationMins + " min Window", 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(windowDurationMins)))
.triggering(
AfterWatermark.pastEndOfWindow()
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply("Format to JSON", ParDo.of(new DataToJson()))
.apply("Emit to sink topic",
PubsubIO.writeStrings().to(SINK_TOPIC)
);

[更新]尝试了以下操作,但似乎都不起作用

  • 删除了重复数据消除
  • 更改为.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
  • 读取带有时间戳的主题属性:PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC).withTimestampAttribute("publishTime"))

窗口化似乎需要与每个数据元素相关联的某种时间戳。然而,PubsubIO的.withTimestampAttribute("publishTime")似乎并不起作用。我是否可以尝试在数据中添加时间戳以进行窗口处理?

[更新2]尝试基于此引用手动附加时间戳(https://beam.apache.org/documentation/programming-guide/#adding-时间戳到一个pcollections-elements(,如下所示,但仍然不起作用

.apply("Add timestamp", ParDo.of(new ApplyTimestamp()))

public class ApplyTimestamp extends DoFn<String, String> {
@ProcessElement
public void addTimestamp(ProcessContext context) {
try {
String data = context.element();
Instant timestamp = new Instant();
context.outputWithTimestamp(data, timestamp);
} catch(Exception e) {
LOG.error("Error timestamping", e);
}
}
}

在这一点上,我觉得我快要疯了哈哈…

从源代码读取后的立即窗口和重复数据消除逻辑之间需要GBK转换。窗口应用于下一个GroupByKeys中,包括复合变换中的一个。GBK将通过键和窗口的组合对元素进行分组。此外,请注意,默认触发器已经是允许延迟为0的AfterWatermark

最新更新