如何制作x小时的SQS延迟队列



我需要一个队列来处理x小时延迟后的消息。我需要一种数据驱动的、完全基于事件的方法,而不使用任何调度器等等。

场景是,我将一些实时数据发送到SNS主题,然后从那里发送到不同的SQS队列,由不同的AWS Lambda函数使用。

其中一个Lambda函数需要在延迟3小时后处理消息。但是,最长交货延迟时间为15分钟。如果我第一次阅读该消息,它将自动从SQS中删除,因为我正在使用事件源映射触发器来调用lambda函数。

所以,我想知道如何避免删除消息,并在第一次处理时使其不可见?

如有任何想法/帮助,我们将不胜感激。

亚马逊SQS不会执行您的请求。此外,我不建议做任何"把戏"来迫使它推迟。

我建议您查看AWS Step Functions。它可以协调AWS Lambda函数之间的交互,并可以配置为在调用AWS Lambda功能之前等待(睡眠(一段时间。

我测试了它。它似乎是可行的。我使用了下面的代码进行测试。然而,它似乎不是";一种良好的做法";做你想要实现的事情的方式。我看到两个主要问题:

  1. 机上按摩的限额为120000。因此,您几乎可以无限制地扩展您的SQS队列。

  2. 您的度量将充斥着错误调用,很难区分真正失败的调用和故意失败的调用。

因此,我将研究其他解决方案。

import json
import os
import time
import boto3
sqs = boto3.client('sqs')
queue_url = os.environ['QUEUE_URL']
new_visibility_timeout = 120 # seconds
def lambda_handler(event, context):
print(json.dumps(event))

current_time = time.time()

no_of_new_records = 0

for record in event['Records']:

msg_timestamp = float(record['attributes']['SentTimestamp'])/1000

msg_age = current_time - msg_timestamp

print(f"Message age: {msg_age} seconds")

if msg_age > new_visibility_timeout:

print("Message to be successfully processed and deleted from queue")

response = sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=record['receiptHandle']
)

print(response)
else:

print("Set long visibility timeout")

response = sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=record['receiptHandle'],
VisibilityTimeout=new_visibility_timeout
)

print(response)

no_of_new_records += 1
if no_of_new_records > 0:
raise Exception("Fail the lambda")

return {'statusCode': 200}

最新更新