Apache Beam Combine.perKey为同一个密钥生成多个输出



我有一个Apache Beam流作业(运行在GCP Dataflow上(,我在其中生成许多单独的操作kpi计数对象(每个对象表示一个计数1(,将它们窗口化5分钟左右,然后使用Combine.perKey转换对它们进行计数。一旦它们被计算在内,我就会将它们发送到一个远程管理服务,该服务维护这些指标,并提供可视化它们和设置警报的工具。

这就是操作度量计数对象的样子:

public class OperationalReportSingleInput implements Serializable {
private LinkedHashMap<String, String> metricLabels;
private String metricLabelKey;
}

这些操作度量计数对象是基于它们的"计数"来聚合/计数的;度量标签";。metricLabelKey字段只是metricLabels LinkedHashMap的字符串表示,在Combine.perKey变换的GroupByKey步骤中使用。

以下是与整个过程相关的所有步骤。你可以看到我第一次窗口所有OperationalReportSingleInput对象大约5分钟,一旦窗口被释放,我就会将每个OperationalReportSingle Input转换为metricLabelString的键/值->OperationalReportSingleInput,然后我计算每个密钥的所有度量,然后将它们发送到远程托管服务器:

public static void aggregateAndSendMetricsToMetricsService(PCollection<OperationalReportSingleInput> individualCountObjects,
JobConfig jobConfig) {
individualCountObjects
.apply("Window that aggregates counts for " + jobConfig.getMetricCollectionWindow() + " seconds",
Window.<OperationalReportSingleInput>into(new GlobalWindows()).triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(jobConfig.getMetricCollectionWindow()))))
.discardingFiredPanes())
.apply("Creating key/values for aggregation purposes.", WithKeys.of(new generateSingleCountUniqueKey()))
.apply("Aggregating based on key.", Combine.perKey(new SingleCountAggregator()))
.apply("Adding dummy key so all requests can be grouped and then processed by single machine.",
ParDo.of(new ApplyDummyKey())).apply(GroupByKey.create())
.apply("Write metrics to Remote Metrics Service.", ParDo.of(new WriteMetricsToMetricsService(jobConfig)));
}

当我在Dataflow(启用了自动缩放(中运行这段代码,其中有几十万条记录时,它的工作方式正如我所期望的那样。因为它聚合了每个metricLabelKey的所有OperationalReportSingleInput计数对象。因此,最终,每个唯一的metricLabelKey都有一个计数。但当我用几百万条记录运行这个时,问题就出现了。在这种情况下,我将获得单个唯一metricLabelKey的多个计数/输出。就好像在一定的规模下;组合";transform的";mergeAccumulators";梯级故障。

因此,与其从Combine.perKey步骤中获得[{key1->1500},{key2->456}],我将获得类似于[{key1->1000},{key1-<500},}key2->456}]的内容。

有没有人经历过这样的事情,或者能发现我做错了什么?

在这种情况下,发生的情况是您可能会得到延迟的数据。

想象一下,您的输入进入您的管道,如下所示:

# Time = 0
[(k1, 1), (k1, 1), (k2, 1)]
# Time = 1 + jobConfig.getMetricCollectionWindow() seconds
[(k1, 1)]

在这种情况下,您将看到以下输出

# Time = jobConfig.getMetricCollectionWindow() seconds
# The first pane fires
[(k1, 2), (k2, 1)]
# Time = 1 + 2* jobConfig.getMetricCollectionWindow() seconds
# The second pane fires
[(k1, 1)]

要修复您的管道,您可能需要回答:您希望如何处理延迟超过jobConfig.getMetricCollectionWindow()秒的数据?你想放下它吗?还是你想等它?

如果要删除此延迟数据,则可以删除触发器的Repeatedly部分。

如果要等待数据,则可以增加jobConfig.getMetricCollectionWindow()

想法?

最新更新