我有一个Apache BEAM管道,我想从Google PubSub主题中读取它,应用重复数据消除,并在15分钟固定窗口(结束时(将消息发送到另一个PubSub主题。然而,为了让它与重复数据消除一起工作,问题是消息似乎会立即发送到主题,而不是等待15分钟结束。
即使在应用了Window.triggering(AfterWatermark.pastEndOfWindow())
之后,它似乎也不起作用(即消息会立即发出(。(参考:https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/windowing/Window.html)。
正在寻求有关我的代码出现问题的帮助?以下完整代码:
此外,假设重复数据消除以固定窗口为界限是否正确,或者我需要单独设置重复数据消除的时域(参考:https://beam.apache.org/releases/javadoc/2.21.0/org/apache/beam/sdk/transforms/Deduplicate.html似乎说它将默认为时域,这将是定义的固定窗口(
pipeline
.apply("Read from source topic", PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC))
.apply("De-duplication ",
Deduplicate.<String>values()
)
.apply(windowDurationMins + " min Window",
Window.<String>into(FixedWindows.of(Duration.standardMinutes(windowDurationMins)))
.triggering(
AfterWatermark.pastEndOfWindow()
)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply("Format to JSON", ParDo.of(new DataToJson()))
.apply("Emit to sink topic",
PubsubIO.writeStrings().to(SINK_TOPIC)
);
[更新]尝试了以下操作,但似乎都不起作用
- 删除了重复数据消除
- 更改为
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
- 读取带有时间戳的主题属性:
PubsubIO.<String>readStrings().fromTopic(SOURCE_TOPIC).withTimestampAttribute("publishTime"))
窗口化似乎需要与每个数据元素相关联的某种时间戳。然而,PubsubIO的.withTimestampAttribute("publishTime")
似乎并不起作用。我是否可以尝试在数据中添加时间戳以进行窗口处理?
[更新2]尝试基于此引用手动附加时间戳(https://beam.apache.org/documentation/programming-guide/#adding-时间戳到一个pcollections-elements(,如下所示,但仍然不起作用
.apply("Add timestamp", ParDo.of(new ApplyTimestamp()))
public class ApplyTimestamp extends DoFn<String, String> {
@ProcessElement
public void addTimestamp(ProcessContext context) {
try {
String data = context.element();
Instant timestamp = new Instant();
context.outputWithTimestamp(data, timestamp);
} catch(Exception e) {
LOG.error("Error timestamping", e);
}
}
}
在这一点上,我觉得我快要疯了哈哈…
从源代码读取后的立即窗口和重复数据消除逻辑之间需要GBK转换。窗口应用于下一个GroupByKeys中,包括复合变换中的一个。GBK将通过键和窗口的组合对元素进行分组。此外,请注意,默认触发器已经是允许延迟为0的AfterWatermark