使用Go阅读来自Google Pub Sub的所有可用消息



我正试图从谷歌pub-sub中的一个主题中获取所有可用的消息。但在go中,我找不到一种配置,一旦Pub-Sub中没有剩余的消息,就可以取消接收回调。

我认为一种方法是使用谷歌云监控Api从Pub Sub获取消息的总数,如本答案中所述。谷歌PubSub-在主题中计数消息,然后记录读取的消息数量,如果计数等于该数量,则调用cancel,但我不太确定这是否是正确的方法。

var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %qn", string(msg.Data))
msg.Ack()
received++
if received == TotalNumberOfMessages {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}

我也尝试过使用带超时的上下文,即在取消后,直到上下文截止日期未满足为止。

ctx, cancel := context.WithTimeout(ctx, 100*time.Second)
defer cancel()
err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
}

但话说回来,这并不能让我确信所有的消息都已处理完毕。

请提出一个可以确保订阅的解决方案。当Pub-Sub中没有剩余的消息时,接收功能停止。

我在以前的公司已经实现了这一点(遗憾的是,我不再有代码了,它在我以前的公司git…中(。

原理如下

msg := make(chan *pubsub.Message, 1)
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
msg <- m
})
for {
select {
case res := <-msg:
fmt.Fprintf(w, "Got message: %qn", string(res.Data))
res.Ack()

case <-time.After(3 * time.Second):
fmt.Println("timeout")
cancel()
}
}

最新更新