当我使用batchItemFailures时,如何防止dynamoDB流处理程序无限处理记录



我有一个动态数据库流,它正在触发一个lambda处理程序,如下所示:

let failedRequestId: string
await asyncForEachSerial(event.Records, async (record) => {
try {
await handle(record.dynamodb.OldImage, record.dynamodb.NewImage, record, context)
return true
} catch (e) {
failedRequestId = record.dynamodb.SequenceNumber
}
return false //break;
})
return {
batchItemFailures:[ { itemIdentifier: failedRequestId } ]
}

我用DestinationConfig.onFailure设置了lambda,它指向我在SQS中配置的DLQ。处理程序背后的思想是处理一批事件,并在第一次失败时中断。然后,它报告"batchItemFailures"中的最新失败,该失败告诉流在下一次尝试时继续该记录。(我从这篇文章中提取了这个想法(

我目前的问题是,如果我的handle()函数在其中一条记录上真的失败了,那么我的退出代码将触发该记录作为下一个处理程序调用的检查点。然而,dlq条件永远不会触发,我最终一次又一次地处理该记录。我还应该注意到,由于handle()不是幂等的,我试图避免多次重新处理记录。

如何在维护批处理的同时优雅地处理错误,而不会为表现良好的流记录多次触发handle()函数?

我不确定你是否找到了你想要的答案。如果其他人遇到这个问题,我会做出回应。

为了避免这个问题,您还需要使用另外两个参数。报价文件(https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html):

重试次数–函数返回错误时Lambda重试的最大次数。这不适用于服务错误或批处理未达到功能的限制。

最长记录期限–Lambda发送给您的函数的记录的最长期限。

基本上,您必须指定应该重试失败的次数,以及Lambda应该查看事件的时间。

最新更新