我很好奇如何在应用程序使用来自Google Pub/Sub的消息时处理升级/重启情况。
例如,我特别感兴趣的是开发一个Golang应用程序,该应用程序部署在运行多个pod的Kubernetes中,并使用来自GooglePub/Sub的消息。我关心的是,在升级pod时,如何确保没有消息丢失(或处理两次(。
我知道应用程序会从订阅中读取消息,然后必须确认已收到消息。我觉得在确认消息和pod关闭升级之间可能存在竞争条件?
我知道用Dataflow作业做类似的事情是可能的,因为你可以停止流作业并向它发出信号以排出消息。
我认为必须有某种方法来优雅地处理这一问题,或者这真的是Dataflow更适合的情况吗?
Kubernetes使用SIGTERM,等待30秒,然后使用SIGKILL。这为您的应用程序在完全终止之前提供了适当的时间,如果30秒的默认值不够,您可以使用terminationGracePeriodSeconds: 60
字段进行调整(链接1(。
然后,您需要在Golang中添加逻辑来接收SIGTERM信号(链接2(。
最后,假设您的队列是rabbit(但其他队列也有类似的功能(,在收到SIGTERM时,您可以将逻辑写入a(停止接收新消息,然后B((这是可选的,您可以让它们完成(,为pod当前已确认但尚未完成的所有消息返回NACK和Requeue信号,将消息放回(链接3和4(。
如果您可以避免实现NACK/Requeue,只需通过关闭队列侦听器并完成当前保留的消息的剩余部分来处理SIGTERM(比如说30或60秒就足够了(,那么这要简单得多,也是值得推荐的。
-
https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace
-
Golang捕获信号
-
rabbitMQ 中的Ack或Nack
-
https://www.rabbitmq.com/nack.html
**编辑**
对于谷歌云pub/sub,你也可以发送一个Nack。
https://pkg.go.dev/cloud.google.com/go/pubsub#Message
"Ack表示消息处理成功。如果消息确认失败,将重新发送消息。Nack表示客户端将不会或无法处理消息。Nack将导致消息比允许其过期时更快地重新传递">