ECS任务只能从SQS队列中拾取一条消息



我有一个看起来像这样的体系结构:

  • 消息一发送到SQS队列,ECS任务就会拾取并处理该消息
  • 这意味着,如果X条消息被发送到队列中,X个ECS任务将并行启动。ECS任务只能获取一条消息(根据我上面的代码)

ECS任务使用dockerizedPython容器,并使用boto3SQS客户端检索和解析SQS消息:

sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
while sqs_message is not None:
# Process it
# Delete if from the queue

# Get next message in queue
sqs_response = get_sqs_task_data('<sqs_queue_url>')
sqs_message = parse_sqs_message(sqs_response)
def get_sqs_task_data(queue_url):
client = boto3.client('sqs')
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1
)
return response
def parse_sqs_message(response_sqs_message):
if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None

# ... parse it and return a dict
return {
data_1 = ..., 
data_2 = ...
}

总而言之,非常简单。

get_sqs_data()中,我明确指定我只想检索一条消息(因为一个ECS任务只需要处理一条消息)。在parse_sqs_message()中,我用测试队列中是否还有一些消息

if 'Messages' not in response_sqs_message:
logging.info('No messages found in queue')
return None

当队列中只有一条消息时(意味着触发了一个ECS任务),一切正常。ECS任务可以拾取、处理和删除消息。

但是,当队列中同时填充了X条消息(X > 1),会触发X个ECS任务,但只有ECS任务能够获取其中一条消息并进行处理。
其他所有ECS任务都将以No messages found in queue退出,尽管还有X - 1消息需要处理。

为什么?为什么其他任务无法拾取要拾取的邮件?

如果这很重要,则将SQS的VisibilityTimeout设置为30分钟。

如有任何帮助,我们将不胜感激!

如果你想的话,可以要求更高的精度。

我忘记回答那个问题了。

问题是SQS被设置为FIFO队列。FIFO队列一次只允许一个使用者(以保持消息的顺序)。将其更改为普通(标准)队列解决了此问题。

我不确定如何从SQS触发任务,但根据我在SQS SDK文档中的理解,如果使用短轮询时消息数量较少,则可能会发生这种情况。从get_sqs_task_data的定义中,我看到您正在使用短轮询。

短轮询是默认行为,其中加权随机机器集在ReceiveMessage调用上采样。因此,只有返回采样的机器。如果队列中的消息数很小(少于1000条),您收到的邮件很可能比您请求的要少每个ReceiveMessage调用。如果队列中的消息数量非常少,则可能不会在特定的ReceiveMessage响应中接收到任何消息。如果发生这种情况,请重复请求。

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs.html#SQS.Client.receive_message

您可能想尝试使用一个值高于可见性超时的长轮询

我希望它能帮助

相关内容

  • 没有找到相关文章

最新更新