如何确保一次最多处理一条消息



我想知道如何在Go中使用谷歌的pub/sub功能一次处理一条消息。我正在使用官方图书馆,https://pkg.go.dev/cloud.google.com/go/pubsub#section-自述。该事件由运行多个实例的服务使用,因此任何内存中的锁定机制都将不起作用。

我意识到这样做是一种反模式,所以让我解释一下我的用例。使用mongoDB,我将对象数组存储为每个实体的嵌入文档。正在发布的事件正在修改并保存此数组的一部分。如果我一次收到多个事件,并且它们同时开始处理,则其中一个保存将覆盖另一个。因此,我想解决这个问题的办法是确保一次只处理一条消息,最好使用云pub/sub中的任何内置功能来做到这一点。否则,我想在DB中实现一些锁定机制,但我希望避免这种情况。

如有任何帮助,我们将不胜感激。

你可以想象两件事:

  • 您可以在PubSub中使用排序键。与此类似,与同一对象相关的所有消息都将按顺序逐一传递
  • 您可以使用PubSub的PUSH订阅来推送到Cloud Run或Cloud Functions。使用Cloud Run,将并发性设置为1(默认情况下,使用Cloud Functions gen1(,并将最大实例设置为1。就像你一次只能处理一条消息一样,所有其他消息都会被拒绝(429HTTP错误代码(,并被重新排队到PubSub。问题是,您可以像以前一样使用排序键并行处理

类似的、更容易实现的事情是使用Cloud Tasks而不是PubSub。使用Cloud Tasks,您可以设置队列的速率限制,并将maxConcurrentDispatches设置为1(您不必对Cloud Functions max instances或Cloud Run max instances and concurrency执行相同操作(

相关内容

最新更新