我需要一个队列来处理x小时延迟后的消息。我需要一种数据驱动的、完全基于事件的方法,而不使用任何调度器等等。
场景是,我将一些实时数据发送到SNS主题,然后从那里发送到不同的SQS队列,由不同的AWS Lambda函数使用。
其中一个Lambda函数需要在延迟3小时后处理消息。但是,最长交货延迟时间为15分钟。如果我第一次阅读该消息,它将自动从SQS中删除,因为我正在使用事件源映射触发器来调用lambda函数。
所以,我想知道如何避免删除消息,并在第一次处理时使其不可见?
如有任何想法/帮助,我们将不胜感激。
亚马逊SQS不会执行您的请求。此外,我不建议做任何"把戏"来迫使它推迟。
我建议您查看AWS Step Functions。它可以协调AWS Lambda函数之间的交互,并可以配置为在调用AWS Lambda功能之前等待(睡眠(一段时间。
我测试了它。它似乎是可行的。我使用了下面的代码进行测试。然而,它似乎不是";一种良好的做法";做你想要实现的事情的方式。我看到两个主要问题:
-
机上按摩的限额为120000。因此,您几乎可以无限制地扩展您的SQS队列。
-
您的度量将充斥着错误调用,很难区分真正失败的调用和故意失败的调用。
因此,我将研究其他解决方案。
import json
import os
import time
import boto3
sqs = boto3.client('sqs')
queue_url = os.environ['QUEUE_URL']
new_visibility_timeout = 120 # seconds
def lambda_handler(event, context):
print(json.dumps(event))
current_time = time.time()
no_of_new_records = 0
for record in event['Records']:
msg_timestamp = float(record['attributes']['SentTimestamp'])/1000
msg_age = current_time - msg_timestamp
print(f"Message age: {msg_age} seconds")
if msg_age > new_visibility_timeout:
print("Message to be successfully processed and deleted from queue")
response = sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=record['receiptHandle']
)
print(response)
else:
print("Set long visibility timeout")
response = sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=record['receiptHandle'],
VisibilityTimeout=new_visibility_timeout
)
print(response)
no_of_new_records += 1
if no_of_new_records > 0:
raise Exception("Fail the lambda")
return {'statusCode': 200}