云运行PubSub高延迟



我正在构建一个微服务应用程序,该应用程序由许多使用Node.js构建并在Cloud Run上运行的微服务组成。我以几种不同的方式使用PubSub:

  1. 用于每日流式传输数据。负责从不同广告服务(Facebook广告、领英广告等(收集分析数据的微服务使用PubSub将数据流式传输到负责将数据上传到Google BigQuery的微服务。还有一些服务通过将数据分割成更小的块来从CRM和其他服务流式传输更高负载的数据(>1Gb(
  2. 用于微服务之间关于不同事件的消息传递,这些事件不需要立即响应

早些时候,我在PubSub中遇到了一些微不足道的延迟。我知道这是一个悬而未决的问题,考虑到低消息吞吐量时长达数秒的延迟。但就我而言,我们谈论的是几分钟的延迟。

此外,我偶尔会收到错误消息

发布时收到错误:在收到任何响应之前,API google.pubsub.v1.Publisher的总超时时间超过60000毫秒。

在这种情况下,消息根本没有发送或被高度延迟。

这就是我的代码的样子。

const subscriptions = new Map<string, Subscription>();
const topics = new Map<string, Topic>();
const listenForMessages = async (
subscriptionName: string,
func: ListenerCallback,
secInit = 300,
secInter = 300
) => {
let logger = new TestLogger("LISTEN_FOR_MSG");
let init = true;
const _setTimeout = () => {
let timer = setTimeout(() => {
console.log(`Subscription to ${subscriptionName} cancelled`);
subscription.removeListener("message", messageHandler);
}, (init ? secInit : secInter) * 1000);
init = false;
return timer;
};
const messageHandler = async (msg: Message) => {
msg.ack();
await func(JSON.parse(msg.data.toString()));
// wait for next message
timeout = _setTimeout();
};
let subscription: Subscription;
if (subscriptions.has(subscriptionName)) {
subscription = subscriptions.get(subscriptionName);
} else {
subscription = pubSubClient.subscription(subscriptionName);
subscriptions.set(subscriptionName, subscription);
}
let timeout = _setTimeout();
subscription.on("message", messageHandler);
console.log(`Listening for messages: ${subscriptionName}`);
};
const publishMessage = async (
data: WithAnyProps,
topicName: string,
options?: PubOpt
) => {
const serializedData = JSON.stringify(data);
const dataBuffer = Buffer.from(serializedData);
try {
let topic: Topic;
if (topics.has(topicName)) {
topic = topics.get(topicName);
} else {
topic = pubSubClient.topic(topicName, {
batching: {
maxMessages: options?.batchingMaxMessages,
maxMilliseconds: options?.batchingMaxMilliseconds,
},
});
topics.set(topicName, topic);
}
let msg = {
data: dataBuffer,
attributes: options.attributes,
};
await topic.publishMessage(msg);
console.log(`Publishing to ${topicName}`);
} catch (err) {
console.error(`Received error while publishing: ${err.message}`);
}
};

listenerForMessage函数由HTTP请求触发。

我已经检查了什么

  1. PubSub客户端只在函数外创建一次
  2. 主题和订阅将重复使用
  3. 我让每个容器至少运行一个实例,以消除冷启动引发延迟的可能性
  4. 我试着增加容器的CPU和内存容量
  5. batchingMaxMessagesbatchingMaxMilliseconds设置为1
  6. 我检查了@googlecloud/pubsub的最新版本是否已安装

票据

  1. 高延迟问题仅在云环境中发生。通过本地测试,一切正常
  2. 超时错误有时同时出现在两种环境中

问题在于我对云运行容器生命周期的理解。当PubSub在后台工作时,我曾经发送HTTP响应202。发送响应后,容器切换到空闲状态,这在我的日志中看起来像是高延迟。

最新更新