在SQS队列中使用许多使用者



我知道使用多个线程可以使用一个SQS队列。我想保证每条消息都会被消费一次。我知道可以更改消息的可见性超时,例如,等于我的处理时间。如果我的进程花费的时间超过可见性超时(例如,慢速连接),其他线程可以使用相同的消息。

保证消息处理一次的最佳方法是什么?

保证消息处理一次的最佳方法是什么?

您要求担保-您将无法获得担保。您可以将消息被多次处理的概率降低到非常小的,但您不会得到保证

我将解释原因,以及减少重复的策略。

重复从何而来

  1. 当您将消息放入SQS时,SQS实际上可能会多次接收该消息
    • 例如:发送消息时的一个小网络问题导致了一个临时错误,该错误会自动重试-从消息发送者的角度来看,它失败了一次,成功发送了一次。但SQS收到了这两条消息
  2. SQS可以在内部生成重复项
    • 类似于第一个例子-有很多计算机在暗中处理消息,SQS需要确保不会丢失任何信息-消息存储在多个服务器上,这可能会导致重复

在大多数情况下,通过利用SQS消息可见性超时,来自这些源的复制机会已经很小了,就像小了百分之一。

如果处理重复真的没有那么糟糕(努力让你的消息消费幂等!),我认为这已经足够好了——进一步减少重复的机会是复杂的,而且可能代价高昂。。。


您的应用程序可以做些什么来进一步减少重复

好的,我们下兔子洞。。。在高级级别上,您将希望为消息分配唯一的id,并检查正在进行或在开始处理之前已完成的id的原子缓存:

  1. 确保您的消息在插入时具有唯一标识符
    • 如果没有这个,你将无法区分重复项
  2. 在邮件的"末尾"处理重复。
    • 如果您的消息接收者需要在收件箱外发送消息以进行进一步处理,那么它可能是另一个重复源(原因与上述类似)
  3. 您将需要一个原子存储和检查这些唯一id的地方(并在一段时间后刷新它们)。有两种重要状态:"正在进行"one_answers"已完成">
    • InProgress条目应该有一个超时,这取决于在处理失败时恢复的速度
    • 根据重复数据消除窗口的长度,已完成的条目应有一个超时
    • 最简单的可能是Guava缓存,但只适用于单个处理应用程序。如果您有大量消息或分布式消费,请考虑为此作业创建一个数据库(带有后台进程来清除过期条目)
  4. 在处理消息之前,请尝试将消息ID存储在"正在进行"中。如果它已经在那里了,停下来——你刚刚处理了一个副本
  5. 检查消息是否为"Completed"(已完成)(如果存在则停止)
  6. 您的线程现在对该messageId具有独占锁定-处理您的消息
  7. 将messageId标记为"Completed"-只要此messageId保持在此处,就不会处理该messageId的任何重复项
    • 不过你可能负担不起无限的存储空间
  8. 从"正在进行"中删除messageId(或者让它从此处过期)

一些注意事项

  • 请记住,如果没有这些,复制的几率已经很低了。根据邮件重复数据消除的时间和金钱价值,您可以随意跳过或修改任何步骤
    • 例如,您可以省略"InProgress",但这打开了两个线程同时处理重复消息的小机会(第二个线程在第一个线程"完成"之前开始)
  • 重复数据消除窗口的长度与您可以将messageId保持在"Completed"(已完成)的时间一样长。由于您可能负担不起无限的存储空间,请将其至少持续2倍于您的SQS消息可见性超时;在那之后,重复的机会减少了(除了已经很低的机会之外,但仍然不能保证)
  • 即便如此,仍有重复的机会-所有预防措施和SQS消息可见性超时都有助于将这种机会减少到很小,但机会仍然存在:
    • 您的应用程序在处理消息后,但在messageId为"Completed"之前,可能会崩溃/挂起/执行很长的GC(可能您正在为此存储使用数据库,并且与该数据库的连接已断开)
    • 在这种情况下,"处理"最终将过期,另一个线程可以处理此消息(要么是在SQS可见性超时也过期之后,要么是因为SQS中有重复)

当您收到消息时,将消息或对消息的引用存储在对消息ID具有唯一约束的数据库中。如果表中存在该ID,则表示您已经收到了该ID,并且由于存在唯一约束,数据库将不允许您再次插入该ID。

AWS SQS API在使用API等读取消息时不会自动"消费"该消息。开发人员需要自己进行删除消息的调用。

SQS确实有一个称为"重新驱动策略"的功能,作为"死信队列设置"的一部分。您只需将读取请求设置为1。如果消费进程崩溃,对同一消息的后续读取将把消息放入死信队列。

SQS队列可见性超时可以设置为12小时。除非您有特殊需要,否则您需要实现将消息处理程序存储在数据库中的过程,以便进行检查。

您可以对消息和批处理使用setVisibilityTimeout(),以延长可见性时间,直到线程完成对消息的处理。

这可以通过使用scheduledExecutiorService来完成,并在初始可见性时间的一半之后调度可运行的事件。下面的代码片段创建并执行VisibilityTimeExtender,时间间隔为visibilityTime的一半。(该时间应保证消息得到处理,延长可视性时间/2)

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);

VisibilityTimeExtender必须实现Runnable,并且是更新新可见性时间的地方。

线程处理完消息后,您可以将其从队列中删除,并调用futureEvent.concel(true)来停止计划的事件。

最新更新