Lambda实现事务



我们创建了一个lambda,它按计划将消息从DL SQS队列移动到SQS队列(目标(。作为其中的一部分,我想实现事务。

基本上将消息复制到目标队列,然后删除DL队列(源(中的消息。但在任何情况下,在将消息复制到目标队列后,并且未能删除源队列中的消息,则应该从目标队列中删除该消息。

这是我的源代码

import json
import boto3
import sys
import sys
def get_messages_from_queue(sqs_client, queue_url, max_message_count):
"""Generates messages from an SQS queue.
Note: this continues to generate messages until the queue is empty.
Every message on the queue will be deleted.
:param queue_url: URL of the SQS queue to read.
See https://alexwlchan.net/2018/01/downloading-sqs-queues/
"""
processed_message_count = 0
while processed_message_count < max_message_count:
#print("Max Mesage Count: " + str(max_message_count))
remaining_message_count = max_message_count - processed_message_count
#print("Remaining messages: " + str(remaining_message_count))
receive_message_count = min(10, remaining_message_count)
get_resp = sqs_client.receive_message(
QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=receive_message_count
)
#print("Actual response:")
#print(get_resp)
try:
#print("Number of messages receieved: " + str(len(get_resp["Messages"])))
yield from get_resp["Messages"]
except KeyError:
return
entries = [
{"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}
for msg in get_resp["Messages"]
]
resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries)
if len(resp["Successful"]) != len(entries):
raise RuntimeError(
f"Failed to delete messages: entries={entries!r} resp={resp!r}"
)

processed_message_count += len(get_resp["Messages"])
print("After deleting, number of processed messages are: " + str(processed_message_count))
def lambda_handler(event, context):
max_message_count = event['MSG_TRANSFER_LIMIT']
src_queue_url = event["SRC_QUEUE_URL"]
dst_queue_url = event["DEST_QUEUE_URL"]
if src_queue_url == dst_queue_url:
sys.exit("Source and destination queues cannot be the same.")
sqs_client = boto3.client("sqs")
#while processed_message_count < max_message_count:

for message in get_messages_from_queue(sqs_client, src_queue_url, max_message_count):
response = sqs_client.send_message(QueueUrl=dst_queue_url, MessageBody=message["Body"])
print(json.loads(message["Body"]['records'][0]))
#print(response)
return {
'ProcessedMessageCount': max_message_count
}

无法从Amazon SQS队列中检索特定消息。该代码将调用receive_messages()并获取队列中的任何内容。无法选择或筛选将返回的消息。

坦率地说,如果你担心源消息不会被删除,那么我建议实现再次尝试删除的重试代码。无法删除很可能是由于暂时的网络错误(重新尝试应该修复(,或者消息已经被删除。

最新更新