GCP Pubsub 在低消息/秒上的高延迟



我正在从 AppEngine 发布 Pubsub 消息 使用 JAVA 客户端库的灵活环境,如下所示:

Publisher publisher = Publisher
.newBuilder(ProjectTopicName.of(Utils.getApplicationId(), "test-topic"))
.setBatchingSettings(
BatchingSettings.newBuilder()
.setIsEnabled(false)
.build())
.build();
publisher.publish(PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(message))
.putAttributes("timestamp", String.valueOf(System.currentTimeMillis()))
.build());

我正在订阅数据流中的主题,并记录消息从 AppEngine 灵活到达数据流所需的时间

pipeline
.apply(PubsubIO.readMessagesWithAttributes().fromSubscription(Utils.buildPubsubSubscription(Constants.PROJECT_NAME, "test-topic")))
.apply(ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
long timestamp = System.currentTimeMillis() - Long.parseLong(c.element().getAttribute("timestamp"));
System.out.println("Time: " + timestamp);
}
}));
pipeline.run();

当我以每秒几条消息的速度发布消息时,日志显示消息到达数据流所需的时间在 100 毫秒到 1.5 秒之间。 但是当速率约为每秒 100 条消息时,时间始终在 100ms - 200ms 之间,这似乎完全足够了。 有人可以解释这种行为吗?似乎关闭发布者批处理不起作用。

发布/订阅专为两种订阅案例的高吞吐量消息而设计。

当有大量消息时,拉取订阅效果最佳,这是当消息处理吞吐量是优先级时要使用的订阅类型。特别要注意的是,同步拉取不会在消息发布后立即处理消息,并且可以选择拉取和处理固定数量的消息(更多消息,更多拉取(。更好的选择是使用异步拉取,它使用长时间运行的消息侦听器并一次确认一条消息 [1]。

另一方面,推送订阅使用慢启动算法:每次成功传递时发送的消息数量都会翻倍,直到达到其限制(更多消息,更多和更快的交付(。


[1] https://cloud.google.com/pubsub/docs/pull#asynchronous-pull

最新更新