在Akka中向ClusterRouter中的路由广播消息



我正在尝试向ClusterRouter配置中的所有路由广播消息。我已经尝试了两种选择。这一个:

 val workerRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(AdaptiveLoadBalancingRouter(metrics), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")
  context.system.scheduler.schedule(2 seconds, 5 seconds, workerRouter, Broadcast(CapabilityRequest))

还有这个

 val broadcastRouter = context.actorOf(Props[ClusterRouter].withRouter(
    ClusterRouterConfig(BroadcastRouter(Nil), ClusterRouterSettings(
      totalInstances = 100, routeesPath = "/user/slave",
      allowLocalRoutees = true, useRole = None))), name = "slaveRouter")
  context.system.scheduler.schedule(2 seconds, 5 seconds, broadcastRouter, CapabilityRequest)

但是对于它们两个,只有一个slaves接收到消息。想法吗?


为了理解为什么我认为第一次尝试应该有效,人们必须看看AdaptiveLoadBalancingRounter.scala,在AdaptiveLoadBalancingRouterLike特征中,当Route被创建时:

{
  case (sender, message) ⇒
    message match {
      case Broadcast(msg) ⇒ toAll(sender, routeeProvider.routees)
      case msg            ⇒ List(Destination(sender, getNext()))
    }
}

在你的第一个例子中,你使用的路由器只会向一个路由者发送信息。从我读过的文档来看,这个路由器将使用来自不同节点的可用指标来选择似乎受到最小胁迫的节点,并向该节点上的路由者发送消息。我认为你在这个设置中看到的行为是预期的。

对于第二个示例,我没有在文档中看到关于在集群环境中使用BraodcastRouter的任何内容,因此我不确定是否支持这种方法。话虽如此,我的猜测是,创建BraodcastRouter与路由的空列表(Nil)是什么导致你所看到的行为。我想如果你把它改成BroadcastRouter(100),你可能会看到不同的行为。但是,我不认为(基于文档中缺乏示例)支持使用BroadcastRouter(我可能错了)。

你能多解释一下你的用例吗,这样我就能理解为什么你的集群需要一个广播类型的路由器了?

编辑

fww,我得到的东西与以下代码工作。首先,配置:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    log-remote-lifecycle-events = off
    netty {
      hostname = "127.0.0.1"
      port = 0
    }
  }
  cluster {
    min-nr-of-members = 2
    seed-nodes = [
      "akka://ClusterSystem@127.0.0.1:2551", 
      "akka://ClusterSystem@127.0.0.1:2552"]
    auto-down = on
  }
}
然后,我使用以下代码启动了两个节点(一个在2551上,另一个在2552上):
object ClusterNode {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port 
    // when specified as program argument
    if (args.nonEmpty) System.setProperty("akka.remote.netty.port", args(0))

    // Create an Akka system
    val system = ActorSystem("ClusterSystem")
    val clusterListener = system.actorOf(Props(new Actor with ActorLogging {
      def receive = {
        case state: CurrentClusterState =>
          log.info("Current members: {}", state.members)
        case MemberJoined(member) =>
          log.info("Member joined: {}", member)
        case MemberUp(member) =>
          log.info("Member is Up: {}", member)
        case UnreachableMember(member) =>
          log.info("Member detected as unreachable: {}", member)
        case _: ClusterDomainEvent => // ignore
      }
    }), name = "clusterListener")
    Cluster(system).subscribe(clusterListener, classOf[ClusterDomainEvent])    
  }
}
class FooActor extends Actor{
  override def preStart = {
    println("Foo actor started on path: " + context.self.path)
  }
  def receive = {
    case msg => println(context.self.path + " received message: " + msg)
  }
}
然后我启动了第三个"节点",我的客户端节点,使用以下代码:
object ClusterClient {
  def main(args: Array[String]) {
    val system = ActorSystem("ClusterSystem")
    Cluster(system) registerOnMemberUp{
      val router = system.actorOf(Props[FooActor].withRouter(
        ClusterRouterConfig(AdaptiveLoadBalancingRouter(HeapMetricsSelector),
        ClusterRouterSettings(
        totalInstances = 20, maxInstancesPerNode = 10,
        allowLocalRoutees = false))),
        name = "fooRouter")  
     router ! Broadcast("bar")
    }
  }
}

当消息发送时,我看到它在两个服务器节点VM中被接收,每个VM有10个actor。

我的路由器和你的路由器的区别是我没有指定本地路由,我把routeesPath换成了maxInstancesPerNode

最新更新