特定版本:PubSub/Dataflow对无边界数据的确认



我有一个项目,该项目有一个apache beam管道,其中的依赖项设置方式使得我必须使用PubSub的0.20.0-beta版本。此管道一直在运行(无限制(。

[+]问题:PubSub消息大约每30分钟重复一次。

[+]我尝试过的:我读过许多解决方案,其中提到了Dataflow运行程序如何在其中设置确认的检查点。我还读到,使用诸如GroupByKey之类的PTransform可以更快地确认这些消息。所以我尝试了按键进行窗口化、触发和分组,但我仍然从PubSub收到重复的消息。

[+]问题:我做错了什么?为什么消息没有得到确认?(如果我理解正确的话,在管道结束执行之前,它不会被确认??但我的管道需要很长时间,如何尽早确认?(

这是特定于0.20.0-beta的"版本"错误吗?还是我应该能够使用带窗口和触发的PubsubIO.Reader以便更早确认?

[+]代码:

窗口时间为10秒,PubSuback截止日期为60秒。

.apply("Listen_To_PubSub", PubsubIO.readStrings().fromSubscription(subscription))
.apply("Windowing", Window.<String> into(window).triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(timeLimit)).withAllowedLateness(Duration.ZERO).discardingFiredPanes())
.apply("DeleteFromBQ", ParDo.of(new DeleteFromBQ()))
.apply("Mapping", ParDo.of(new Mapping()))
.apply("GroupByKey", GroupByKey.<String,String>create())
.apply("Acknowledge", ParDo.of(new Grouped()))
.apply("DoSomething1", ParDo.of(new DoSomething1()))
.apply("Flatten_Iterable", Flatten.iterables())
.apply("DoSomething2", ParDo.of(new DoSomething2()))
.apply("DoSomething3", ParDo.of(new DoSomething3()))
.apply("DoSomething4", ParDo.of(new DoSomething4()))
.apply("Write_To_BigQuery", BigQueryIO.writeTableRows()
.to(output)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);

提前感谢!欢迎提供任何意见。

在应用这么多转换时,似乎超过了60秒的ack截止日期。要查看需要多长时间,我建议使用"记录管道消息"。我认为您可能需要尽快移动确认。

您可以做的另一件事是使用更高的机器类型来更快地处理消息。

因此,我最终通过将管道拆分为2来解决这个问题。

前半部分只侦听pubsubmessages |获取相关信息|写入另一个pubsub主题。

后半部分侦听这些消息,然后将这些消息中的信息用于管道的其余部分。

这种拆分不仅处理了对消息的确认,而且允许并行性更好地工作!

最新更新