到 akka 端点的驼峰路由会抛出 ActorNotRegisterException



我想将驼峰端点设置为akka组件,就像 akka://some-system/user/myactor

我有以下代码:

package rsmev
import akka.actor.{ActorRef, Actor, Props, ActorSystem}
import akka.camel.{CamelExtension, CamelMessage, Consumer}
import org.apache.camel.builder.RouteBuilder
object Frontend {
  def main(args: Array[String]) {
    val system = ActorSystem("my_system")
    val actor = system.actorOf(Props[ConsumerActor], "myconsumer")
    val context = CamelExtension(system).context
    context.addRoutes(new RouteBuilder() {
      override def configure(): Unit = {
        from("direct:start")
        .to("akka://my_system/user/myconsumer")
      }
    })
    context.start()
    Thread.sleep(10 * 1000)
    val producer = context.createProducerTemplate()
    producer.sendBody("direct:start", "HELLO!")
    Thread.sleep(10 * 1000)
  }
}
class ConsumerActor extends Actor {
  override def receive = {
    case _ => println("OK")
  }
}

执行此代码时,我得到:

  akka.camel.ActorNotRegisteredException: Actor [akka://survex_system/user/myconsumer] doesn't exist
  at akka.camel.internal.component.ActorProducer$$anonfun$actorFor$1.apply(ActorComponent.scala:175)
  at akka.camel.internal.component.ActorProducer$$anonfun$actorFor$1.apply(ActorComponent.scala:175)
  at scala.Option.getOrElse(Option.scala:121)
  at akka.camel.internal.component.ActorProducer.actorFor(ActorComponent.scala:175)
  at akka.camel.internal.component.ActorProducer.fireAndForget(ActorComponent.scala:172)
  at akka.camel.internal.component.ActorProducer.processExchangeAdapter(ActorComponent.scala:143)
  at akka.camel.internal.component.ActorProducer.process(ActorComponent.scala:120)
  at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113)
  at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:73)
  at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:375)
  at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:343)
  at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:233)
  at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
  at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
  at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
  at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
  at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:144)
  at rsmev.Frontend$.main(Frontend.scala:17)
  at rsmev.Frontend.main(Frontend.scala)
Exception in thread "main" org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[Message: HELLO!]
  at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1379)
  at org.apache.camel.util.ExchangeHelper.extractResultBody(ExchangeHelper.java:623)
  at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:467)
  at org.apache.camel.impl.DefaultProducerTemplate.extractResultBody(DefaultProducerTemplate.java:463)
  at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:139)
  at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:144)
  at rsmev.Frontend$.main(Frontend.scala:17)
  at rsmev.Frontend.main(Frontend.scala)
Caused by: akka.camel.ActorNotRegisteredException: Actor [akka://survex_system/user/myconsumer] doesn't exist
  at akka.camel.internal.component.ActorProducer$$anonfun$actorFor$1.apply(ActorComponent.scala:175)
  at akka.camel.internal.component.ActorProducer$$anonfun$actorFor$1.apply(ActorComponent.scala:175)
  at scala.Option.getOrElse(Option.scala:121)
  at akka.camel.internal.component.ActorProducer.actorFor(ActorComponent.scala:175)
  at akka.camel.internal.component.ActorProducer.fireAndForget(ActorComponent.scala:172)
  at akka.camel.internal.component.ActorProducer.processExchangeAdapter(ActorComponent.scala:143)
  at akka.camel.internal.component.ActorProducer.process(ActorComponent.scala:120)
  at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113)
  at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:51)
  at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
  at org.apache.camel.processor.UnitOfWorkProducer.process(UnitOfWorkProducer.java:73)
  at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:375)
  at org.apache.camel.impl.ProducerCache$2.doInProducer(ProducerCache.java:343)
  at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:233)
  at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:343)
  at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:184)
  at org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:124)
  at org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:137)
  ... 3 more

我不知道为什么会这样,因为演员myconsumer已经注册了。

终于弄清楚了问题所在 - 你不需要启动骆驼上下文,这是正确的使用方式:

package rsmev
import akka.actor.{ActorRef, Actor, Props, ActorSystem}
import akka.camel.{CamelExtension, CamelMessage, Consumer}
import org.apache.camel.builder.RouteBuilder
import akka.camel._
object Frontend {
  def main(args: Array[String]) {
    val system = ActorSystem("system")
    val actor = system.actorOf(Props[ConsumerActor], "myconsumer")
    val context = CamelExtension(system).context
    context.addRoutes(new RouteBuilder() {
      override def configure(): Unit = {
        from("direct:start")
          .to("akka://system/user/myconsumer")
      }
    })
    Thread.sleep(5 * 1000)
    val producer = context.createProducerTemplate()
    producer.sendBody("direct:start", "HELLO!")
    Thread.sleep(10 * 1000)
  }
}
class ConsumerActor extends Actor {
  override def receive = {
    case _ => println("OK")
  }
}

相关内容

  • 没有找到相关文章

最新更新