Camel、Akka、JMS和延迟消息确认:我可以向代理确认以前处理过的消息吗



我使用的是Akka(最新的稳定版本)、akka-camel和JMS(就本次对话而言,假设它是ActiveMQ,但理想情况下,解决方案应该是通用的)。

用例

我有以下用例。在队列Q上,我收到这样的消息:

time:   1     2    3    4    5    6
      | A1 | B1 | C1 | C2 | A2 | B2 | .... 
         ^                        ^
       first                     latest

我的最终目标是在(A1,A2)(B1,B2)等中将它们配对在一起;尽管存在重复和未送达消息等复杂情况,但复杂的是,我必须确保,在匹配并处理完整个配对之前,代理将保留所有未确认的消息

示例

4,我接收并处理了4条消息,并成功地处理了对(C1, C2),但我仍然无法向代理确认任何消息,因为A1B1仍然不匹配且挂起,在JMS中,确认C2意味着确认C2之前的所有消息。事实上,我可以发送回的第一个确认是在时间5,当接收到A2时:此时我可以确认A1(并且只有A1,因为B1仍然未决)。

问题

现在,我似乎不知道如何通过akka-camel进行这种延迟异步确认。我一直在网上阅读,虽然我可以找到关于如何手动确认消息的解释(文档和示例),但没有任何内容显示如何向代理确认之前处理过的消息。

import akka.camel.{ CamelMessage, Consumer }
import akka.camel.Ack
import akka.actor.Status.Failure
class Consumer3 extends Consumer {
  override def autoAck = false
  def endpointUri = "jms:queue:test"
  def receive = {
    case msg: CamelMessage =>
      sender() ! Ack
      // on success
      // ..
      val someException = new Exception("e1")
      // on failure
      sender() ! Failure(someException)
  }
}

在这种情况下,Ackobject,其语义实际上只是:我确认当前消息,而我需要类似消息X现在已被确认的东西,其中X是以前的消息,但不一定是当前消息。

这个用例是通过akka-camel支持的还是我应该自己构建的?

感谢

听起来您想要的是activemq的ActiveMQSession.INDVIDUAL_ACKNOWLEDE.

遗憾的是INVIDUAL_ACKNOWLEDGE不是JMS规范的一部分,但根据这篇文章,它在每个队列的基础上被广泛采用。(我实际上还没有检查)

由于它不符合规格,akka不支持开箱即用,但它肯定会做出很好的贡献!

最新更新