我有一个看起来像这样的体系结构:
- 消息一发送到SQS队列,ECS任务就会拾取并处理该消息
- 这意味着,如果X条消息被发送到队列中,X个ECS任务将并行启动。ECS任务只能获取一条消息(根据我上面的代码)
ECS任务使用dockerized
Python容器,并使用boto3
SQS客户端检索和解析SQS消息:
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
while sqs_message is not None:
# Process it
# Delete if from the queue
# Get next message in queue
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
def get_sqs_task_data(queue_url):
client = boto3.client('sqs')
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
return response
def parse_sqs_message(response_sqs_message):
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
# ... parse it and return a dict
return {
data_1 = ...,
data_2 = ...
}
总而言之,非常简单。
在get_sqs_data()
中,我明确指定我只想检索一条消息(因为一个ECS任务只需要处理一条消息)。在parse_sqs_message()
中,我用测试队列中是否还有一些消息
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None
当队列中只有一条消息时(意味着触发了一个ECS任务),一切正常。ECS任务可以拾取、处理和删除消息。
但是,当队列中同时填充了X条消息(X > 1
)时,会触发X个ECS任务,但只有ECS任务能够获取其中一条消息并进行处理。
其他所有ECS任务都将以No messages found in queue
退出,尽管还有X - 1
消息需要处理。
为什么?为什么其他任务无法拾取要拾取的邮件?
如果这很重要,则将SQS的VisibilityTimeout
设置为30分钟。
如有任何帮助,我们将不胜感激!
如果你想的话,可以要求更高的精度。
我忘记回答那个问题了。
问题是SQS被设置为FIFO队列。FIFO队列一次只允许一个使用者(以保持消息的顺序)。将其更改为普通(标准)队列解决了此问题。
我不确定如何从SQS触发任务,但根据我在SQS SDK文档中的理解,如果使用短轮询时消息数量较少,则可能会发生这种情况。从get_sqs_task_data
的定义中,我看到您正在使用短轮询。
短轮询是默认行为,其中加权随机机器集在ReceiveMessage调用上采样。因此,只有返回采样的机器。如果队列中的消息数很小(少于1000条),您收到的邮件很可能比您请求的要少每个ReceiveMessage调用。如果队列中的消息数量非常少,则可能不会在特定的ReceiveMessage响应中接收到任何消息。如果发生这种情况,请重复请求。
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message
您可能想尝试使用一个值高于可见性超时的长轮询
我希望它能帮助