我希望运行一个服务,该服务将使用放入SQS队列的消息。构建消费者应用程序的最佳方式是什么?
一种想法是创建一堆运行以下内容的线程或进程:
def run(q, delete_on_error=False):
while True:
try:
m = q.read(VISIBILITY_TIMEOUT, wait_time_seconds=MAX_WAIT_TIME_SECONDS)
if m is not None:
try:
process(m.id, m.get_body())
except TransientError:
continue
except Exception as ex:
log_exception(ex)
if not delete_on_error:
continue
q.delete_message(m)
except StopIteration:
break
except socket.gaierror:
continue
我还错过了什么重要的东西吗?在队列读取和删除调用中,我还需要防范哪些异常情况?其他人如何管理这些消费者?
我确实找到了这个项目,但它似乎停滞不前,而且有一些问题。
我倾向于单独的进程而不是线程来避免GIL。是否有一些容器进程可以用来启动和监视这些单独运行的进程?
有以下几点:
- SQS API允许您通过单个API调用接收多条消息(最多10条消息,或最多256k条消息,以先达到的限制为准)。利用此功能可以降低成本,因为每次API调用都会向您收费。看起来您正在使用boto库-请查看
get_messages
- 在您现在的代码中,如果处理消息由于暂时错误而失败,则在可见性超时到期之前,将无法再次处理该消息。您可能需要考虑立即将消息返回到队列。您可以通过在该消息上使用
0
调用change_visibility
来执行此操作。然后,该消息将可立即进行处理。(如果你这样做,那么该消息的可见性超时可能会永久更改-事实并非如此。AWS文档指出,"下次收到消息时,该消息的可视超时将恢复为原始超时值"。有关更多信息,请参阅文档。)
如果您正在寻找一个健壮的SQS消息使用者的示例,您可能需要查看NServiceBus.AmazonSQS(我是其作者)。(C#-对不起,我找不到任何python示例。)