为什么我得到50%的GCP发布/订阅消息重复?



我正在运行一个分析管道。

  • 吞吐量为每秒11条消息。
  • 我的Pub/Sub主题约有2M条消息。
  • 80个GCE实例正在并行提取消息。

这是我的主题和订阅:

gcloud pubsub topics create pipeline-input
gcloud beta pubsub subscriptions create pipeline-input-sub 
--topic pipeline-input 
--ack-deadline 600 
--expiration-period never 
--dead-letter-topic dead-letter

我是这样拉消息的:

import { PubSub, Message } from '@google-cloud/pubsub'
const pubSubClient = new PubSub()
const queue: Message[] = []
const populateQueue = async () => {
const subscription = pubSubClient.subscription('pipeline-input-sub', {
flowControl: {
maxMessages: 5
}
})
const messageHandler = async (message: Message) => {
queue.push(message)
}
subscription.on('message', messageHandler)
}
const processQueueMessage = () => {
const message = queue.shift()
try {
...
message.ack()
} catch {
...
message.nack()
}
processQueueMessage()
}
processQueueMessage()

处理时间约7秒。

这是许多类似的dup案例之一。相同的消息被传递5(!!)次到不同的GCE实例:

  • 03:37:42.377
  • 03:45:20.883
  • 03:48:14.262
  • 04:01:33.848
  • 05:57:45.141

所有5次消息都被成功处理,.ack()ed。输出的消息比输入多50% !我很清楚"至少有一次"这个词。行为,但我认为它可能会重复0.01%的消息,而不是50%。

主题输入100%无重复项。我验证这个话题输入法和un-acked消息通过云监控的数量。数字匹配:在pub/sub主题中没有重复。

更新:

  1. 看起来所有这些副本都是由于备份截止日期到期而创建的。我有100%的把握,在600秒的最后期限之前,我确认了99.9%的消息。

预计会有一些重复,尽管50%的重复率肯定很高。第一个问题是,这些是发布端副本还是订阅端副本?前发布的消息时创建重试,导致多个发布相同的消息。这些消息将具有不同的消息id。后者是由向订阅者重新传递同一消息引起的。这些消息具有相同的消息ID(尽管返回ID不同)。

听起来你已经验证了这些是订阅端副本。因此,可能的原因,正如你提到的是一个过期的ack截止日期。问题是,为什么消息超过了返回截止日期?需要注意的一点是,在使用客户机库时,订阅中设置的返回截止日期并不是使用的截止日期。相反,客户端库尝试根据客户端库设置和第99百分位延迟来优化应答截止日期。然后它更新消息的租约,直到FlowControl对象的max_lease_duration属性传递到subscribe方法。默认为1小时。

因此,为了使消息保持租用状态,客户端库必须能够向服务器发送modifyAckDeadline请求。重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器上的过载。运行这条管道的机器还在做其他工作吗?如果是这样,它们可能在CPU、内存或网络方面过载,无法发送modifyAckDeadline请求,也无法及时处理消息。

消息批处理也可能会影响您回退消息的能力。作为一种优化,Pub/Sub系统存储批量消息的确认,而不是单个消息。因此,必须确认批处理中的所有消息,才能确认所有消息。因此,如果您在一个批处理中有5条消息,并确认其中4条消息,但随后不回复最后一条消息,则所有5条消息都将被重新发送。有一些缓存可以尽量减少这种情况,但这仍然是一种可能性。有一篇Medium文章对此进行了更详细的讨论(参见"消息重发")。重复Rate"部分)。在收到消息之后,在调用acknack之前,通过打印消息ID来检查代码中所有消息是否都被打包了,这可能是值得的。如果您的消息是分批发布的,则单个消息可能会导致重新传递更多消息。

批处理和副本之间的耦合是我们正在积极改进的。我希望这个问题能在某个时候停止。同时,如果您可以控制发布者,您可以将批处理设置中的max_messages属性设置为1,以防止消息的批处理。

如果这些都没有帮助,最好打开一个支持案例,并提供一些重复消息的项目名称、订阅名称和消息id。工程师可以更详细地调查为什么单个消息被重新传递。

相关内容

最新更新