部署SQS使用者



我希望运行一个服务,该服务将使用放入SQS队列的消息。构建消费者应用程序的最佳方式是什么?

一种想法是创建一堆运行以下内容的线程或进程:

def run(q, delete_on_error=False):
    while True:
        try:
            m = q.read(VISIBILITY_TIMEOUT, wait_time_seconds=MAX_WAIT_TIME_SECONDS)
            if m is not None:
                try:
                    process(m.id, m.get_body())
                except TransientError:
                    continue
                except Exception as ex:
                    log_exception(ex)
                    if not delete_on_error:
                        continue
                q.delete_message(m)
        except StopIteration:
            break
        except socket.gaierror:
            continue

我还错过了什么重要的东西吗?在队列读取和删除调用中,我还需要防范哪些异常情况?其他人如何管理这些消费者?

我确实找到了这个项目,但它似乎停滞不前,而且有一些问题。

我倾向于单独的进程而不是线程来避免GIL。是否有一些容器进程可以用来启动和监视这些单独运行的进程?

有以下几点:

  • SQS API允许您通过单个API调用接收多条消息(最多10条消息,或最多256k条消息,以先达到的限制为准)。利用此功能可以降低成本,因为每次API调用都会向您收费。看起来您正在使用boto库-请查看get_messages
  • 在您现在的代码中,如果处理消息由于暂时错误而失败,则在可见性超时到期之前,将无法再次处理该消息。您可能需要考虑立即将消息返回到队列。您可以通过在该消息上使用0调用change_visibility来执行此操作。然后,该消息将可立即进行处理。(如果你这样做,那么该消息的可见性超时可能会永久更改-事实并非如此。AWS文档指出,"下次收到消息时,该消息的可视超时将恢复为原始超时值"。有关更多信息,请参阅文档。)

如果您正在寻找一个健壮的SQS消息使用者的示例,您可能需要查看NServiceBus.AmazonSQS(我是其作者)。(C#-对不起,我找不到任何python示例。)

最新更新