消费者/生产商AWS SQS Akka Scala和Synchrone消费者



我的应用程序有生产者和一个消费者。我的制作人不规则地产生信息。有时候我的队列将是空的,有时候我会收到几条消息。我想让我的消费者收听队列,当其中一条消息时,请接受并处理此消息。这个过程可能需要几个小时,如果我的消费者还没有完成当前消息,我不希望我的消费者再发出其他队列的消息。

我认为Akka和AWS SQS可以满足我的需求。通过阅读文档和示例,Akka-Camel似乎可以简化我的作品。

我在github上找到了这个示例。

我对消费者的配置更感兴趣(thread.sleep只是为了模拟我的处理):

class MySqsConsumer extends Consumer {
  //The SQS URI is an in-only message exchange (autoAck=true)
  override def endpointUri = "aws-sqs://sqs-akka-camel?  amazonSQSClient=#client"
  override def receive = {
    case msg: CamelMessage => {
      println("Start received %s" format msg.bodyAs[String])
      Thread.sleep(4000)
      println("Stop received %s" format msg.bodyAs[String])
    }
  }
 }

stacktrace:

...
14:38:53.335 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World1!]
14:38:54.051 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World2!]
14:38:54.753 [Camel (sqs-akka-camel) thread #0 - aws-sqs://sqs-akka-camel] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[akka://sqs-akka-camel/user/consumer?autoAck=true&replyTimeout=60000+milliseconds] Exchange[Message: Hello World3!]
Start received Hello World1!
Stop received Hello World1!
Start received Hello World2!
Stop received Hello World2!
Start received Hello World3!
...

我的问题是Akka-Camel在完成接收方法之前从队列中读取消息。

在发出新消息之前,如何使消费者等待流程的结束?如果我使用错误的工具/库,您能否将我定向到新工具?

不确定您是否正在使用Akka流,但是如果您是Akka流,请将源发送到sink.queue

val graph=yourSource.to(Sink.queue)

实现(graph.run)会返回凹槽时,您可以从中摘取手动项目。它将使用背压机制,因此您不必担心如果不拉动物品会发生什么。

注意:我故意忽略了仿制药!但是不要忘记它们。

据我所知,你做不到。骆驼一直在排队中消耗消息。

您可以设置override def autoAck = false-在确认当前的消息之前,它不会向receive()发送另一个消息。但是"隐藏"的骆驼演员仍将消耗。

也许您可以提供自定义的amazonSQSClient并以某种方式控制它。

最新更新