我正在运行一个分析管道。
- 吞吐量为每秒11条消息。
- 我的Pub/Sub主题约有2M条消息。
- 80个GCE实例正在并行提取消息。
这是我的主题和订阅:
gcloud pubsub topics create pipeline-input
gcloud beta pubsub subscriptions create pipeline-input-sub
--topic pipeline-input
--ack-deadline 600
--expiration-period never
--dead-letter-topic dead-letter
我是这样拉消息的:
import { PubSub, Message } from '@google-cloud/pubsub'
const pubSubClient = new PubSub()
const queue: Message[] = []
const populateQueue = async () => {
const subscription = pubSubClient.subscription('pipeline-input-sub', {
flowControl: {
maxMessages: 5
}
})
const messageHandler = async (message: Message) => {
queue.push(message)
}
subscription.on('message', messageHandler)
}
const processQueueMessage = () => {
const message = queue.shift()
try {
...
message.ack()
} catch {
...
message.nack()
}
processQueueMessage()
}
processQueueMessage()
处理时间约7秒。
这是许多类似的dup案例之一。相同的消息被传递5(!!)次到不同的GCE实例:
- 03:37:42.377
- 03:45:20.883
- 03:48:14.262
- 04:01:33.848
- 05:57:45.141
所有5次消息都被成功处理,.ack()
ed。输出的消息比输入多50% !我很清楚"至少有一次"这个词。行为,但我认为它可能会重复0.01%的消息,而不是50%。
主题输入100%无重复项。我验证这个话题输入法和un-acked消息通过云监控的数量。数字匹配:在pub/sub主题中没有重复。
更新:
- 看起来所有这些副本都是由于备份截止日期到期而创建的。我有100%的把握,在600秒的最后期限之前,我确认了99.9%的消息。
预计会有一些重复,尽管50%的重复率肯定很高。第一个问题是,这些是发布端副本还是订阅端副本?前发布的消息时创建重试,导致多个发布相同的消息。这些消息将具有不同的消息id。后者是由向订阅者重新传递同一消息引起的。这些消息具有相同的消息ID(尽管返回ID不同)。
听起来你已经验证了这些是订阅端副本。因此,可能的原因,正如你提到的是一个过期的ack截止日期。问题是,为什么消息超过了返回截止日期?需要注意的一点是,在使用客户机库时,订阅中设置的返回截止日期并不是使用的截止日期。相反,客户端库尝试根据客户端库设置和第99百分位延迟来优化应答截止日期。然后它更新消息的租约,直到FlowControl
对象的max_lease_duration
属性传递到subscribe
方法。默认为1小时。
modifyAckDeadline
请求。重复的一个可能原因是客户端无法发送这些请求,这可能是由于机器上的过载。运行这条管道的机器还在做其他工作吗?如果是这样,它们可能在CPU、内存或网络方面过载,无法发送modifyAckDeadline
请求,也无法及时处理消息。
消息批处理也可能会影响您回退消息的能力。作为一种优化,Pub/Sub系统存储批量消息的确认,而不是单个消息。因此,必须确认批处理中的所有消息,才能确认所有消息。因此,如果您在一个批处理中有5条消息,并确认其中4条消息,但随后不回复最后一条消息,则所有5条消息都将被重新发送。有一些缓存可以尽量减少这种情况,但这仍然是一种可能性。有一篇Medium文章对此进行了更详细的讨论(参见"消息重发")。重复Rate"部分)。在收到消息之后,在调用ack
和nack
之前,通过打印消息ID来检查代码中所有消息是否都被打包了,这可能是值得的。如果您的消息是分批发布的,则单个消息可能会导致重新传递更多消息。
批处理和副本之间的耦合是我们正在积极改进的。我希望这个问题能在某个时候停止。同时,如果您可以控制发布者,您可以将批处理设置中的max_messages
属性设置为1,以防止消息的批处理。
如果这些都没有帮助,最好打开一个支持案例,并提供一些重复消息的项目名称、订阅名称和消息id。工程师可以更详细地调查为什么单个消息被重新传递。