使用pub sub调用函数时如何避免内存泄漏?



我在使用pubsub触发函数时遇到性能问题。

//this will call on index.ts
export function downloadService() {
// References an existing subscription
const subscription = pubsub.subscription("DOWNLOAD-sub");
// Create an event handler to handle messages
// let messageCount = 0;
const messageHandler = async (message : any) => {
console.log(`Received message ${message.id}:`);
console.log(`tData: ${message.data}`);
console.log(`tAttributes: ${message.attributes.type}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
await exportExcel(message);//my function
// messageCount += 1;
};

// Listen for new messages until timeout is hit
subscription.on("message", messageHandler);
} 
async function exportExcel(message : any) {
//get data from database
const movies = await Sales.findAll({
attributes: [
"SALES_STORE",
"SALES_CTRNO",
"SALES_TRANSNO",
"SALES_STATUS",
],
raw: true,
});
... processing to excel// 800k rows
... bucket.upload to gcs
}

如果我只触发一个pubsub消息,上面的函数工作得很好。但是,如果在短时间内触发多个pubsub消息,该函数会遇到内存泄漏或数据库连接超时问题。

我发现的问题是,第一个处理还没有完成,但是来自pubsub的其他请求将直接再次调用函数并同时处理。

我不知道如何解决这个问题,但我在想实现队列工作者或谷歌云任务将解决问题?

正如@chovy在评论中提到的,有必要将excelExport函数调用排队,因为函数的执行跟不上调用的速度。async是一个可以用来对函数调用进行排队的模块。请注意,async模块不被Google正式支持。

作为替代方案,您可以在订阅者端使用流量控制功能。数据管道经常会收到发布流量的零星峰值,这可能会使订阅者在努力赶上时不堪重负。对订阅的高发布吞吐量的通常响应是动态地自动扩展订阅者资源以消耗更多消息。但是,这可能会产生不必要的成本—例如,您可能需要使用更多的VM—这可能会导致额外的容量规划。订阅者端的流控制功能可以通过允许订阅者调节消息摄取的速率来帮助控制管道上这些任务的不健康行为。有关流量控制功能的更多信息,请参阅本博客。

最新更新