Node.js Google PubSub订阅服务器未接收到某些消息



摘要:

我的Node.js应用程序中有聊天功能,我想通过socket.io向客户端发送消息。我通过PubSub触发向客户端的发送。现在,当运行PubSub Subscription时,大约70%的情况下一切正常(即打印消息(,但有时会停止并不做任何事情(特别是它也不会打印错误(。我可以确认,丢失的30%(消息(正在发布到该主题,因为同一主题的不同订阅会收到消息。

如有任何调试方面的帮助,我们将不胜感激。

详细信息

这是我的堆栈:

  • Node.js
  • 插座.io
  • express.js
  • passport.js
  • MongoDB
  • react.js

过程:

  • Anna在聊天中发送消息(这会写入数据库并发布到PubSub主题"消息"(
  • Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人
  • BoB在这种情况下,与Anna在同一频道上的人将收到消息

为什么我不直接从Anna发射到Bob?原因是我想在消息之间有一个AppEngine,并可能在那里添加一些逻辑,这似乎是一个很好的方法

订阅

const pubSubClient = require('./client');
const errorHandler = function(error) {
console.error(`ERROR: ${error}`);
throw error;
};
module.exports = listenForMessages =(subscriptionName="messageReceiver",io) => {
const subscription = pubSubClient.subscription(subscriptionName);
// Listen for new messages until timeout is hit
subscription.on("message", (message) => {
console.log(`Received message ${message.id}:`);
const parsedMessage = JSON.parse(message.data)
parsedMessage._id = message.id
console.log(parsedMessage)
if (parsedMessage.to === "admin") {
io.to(`admin:${parsedMessage.from}`).emit("NewMessage", parsedMessage);
} else {
io.to(`${parsedMessage.to}`).emit("NewMessage", parsedMessage);
}
message.ack();
});
subscription.on('error', errorHandler);
}

服务器.js

...
const listenForMessages = require("./message_processing/listen");
listenForMessages("messageReceiver", io);

示例控制台输出

以下控制台输出是通过在本地运行该应用程序生成的,两个浏览器[一个以匿名方式]相互聊天。可以看出,实际上只有最后一条消息被侦听器拾取(并打印出来(。有趣的是,由于调用的异步性,接收到的消息的打印输出早于发送消息的日志(即,延迟在这里肯定不是问题(。

[0] went into deserialize user
[0] Message 875007020424601 published.
[0] went into deserialize user
[0] Message 875006704834317 published.
[0] went into deserialize user
[0] Message 875006583857400 published.
[0] went into deserialize user
[0] Message 875006520104287 published.
[0] went into deserialize user
[0] Message 875006699141463 published.
[0] went into deserialize user
[0] Received message 875006881073134:
[0] {
[0]   from: '5e949f73aeed81beefaf6daa',
[0]   to: 'admin',
[0]   content: 'i6',
[0]   seenByUser: true,
[0]   type: 'message',
[0]   createdByUser: true,
[0]   createdAt: '2020-04-20T07:44:54.280Z',
[0]   _id: '875006881073134'
[0] }
[0] Message 875006881073134 published.

在其他一些情况下,早期的消息起作用,然后侦听器似乎停止了。

您可以做几件事来检查发生了什么:

  1. 转到主题页面,选择主题以查看详细信息,并查看发布率。就Pub/Sub而言,确保您认为正在发布的消息实际上是成功发布的。如果发布失败,则可能会将它们传递给其中一个订阅者,而不是另一个订阅者
  2. 转到订阅页面,选择订阅以查看详细信息,并查看"未确认邮件计数"one_answers"最旧未确认邮件年龄"图。如果这些值为非零,则意味着有消息无法发送到您的订阅者。如果它们为零,那么这意味着消息将被发送到您的订户并由其确认

如果未确认消息的数量为零,则可能的原因是在接收消息的订阅上充当订阅者的流氓进程。也许您的服务的前一个实例仍在意外运行?或者可能另一个应该使用不同订阅的任务正在使用相同的订阅。

另一件需要注意的事情是,订阅者将只接收在消息发布之前创建的订阅中的消息。因此,如果您启动了发布者并发布了一些消息,然后创建了订阅,例如在订阅服务器启动时,则订阅服务器将不会收到这些早期消息。

最新更新