Akka调度模式



考虑经典的"Word Count"程序。它计算某个目录中所有文件的字数。Master接收一些目录,并在Worker参与者之间分割作业(每个Worker参与者处理一个文件)。这是伪代码:

class WordCountWorker extends Actor {
  def receive = {
    case FileToCount(fileName:String) =>
      val count = countWords(fileName)
      sender ! WordCount(fileName, count)
  }
}
class WordCountMaster extends Actor {
  def receive = {
    case StartCounting(docRoot) => // sending each file to worker
      val workers = createWorkers()
      fileNames = scanFiles(docRoot)
      sendToWorkers(fileNames, workers)
    case WordCount(fileName, count) => // aggregating results
      ...
  }
}

但是我想按时间表运行这个单词计数程序(例如每1分钟),提供不同的目录来扫描。

和Akka提供了一个很好的方式来调度消息传递:

system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))

但是上述调度程序的问题开始于调度程序按tick发送新消息,但之前的消息尚未处理(例如我发送消息扫描某个大目录,1秒后我发送另一个消息扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的WordCountMaster将从正在处理不同目录的工作程序接收WordCount消息。

作为一个解决方案,而不是调度消息发送,我可以调度一些代码块的执行,这将创建每次新的 WordCountMaster。即一个目录=一个WordCountMaster。但我认为这是低效的,而且我需要注意为WordCountMaster提供唯一的名称,以避免InvalidActorNameException

所以我的问题是:我应该为每个tick创建新的WordCountMaster,我在上面的段落中提到的?或者有更好的想法/模式如何重新设计这个程序来支持调度?

一些更新:在每个目录创建一个Master actor的情况下,我遇到了一些问题:

  1. 演员命名问题

InvalidActorNameException:演员名[WordCountMaster]不唯一!

InvalidActorNameException: actor名称[WordCountWorker]不存在独一无二的!

我可以克服这个问题,只是不提供演员的名字。但在这种情况下,我的演员接收自动生成的名称,如$a, $b等。这对我不好。

  • 配置错误:
  • 我想把路由器的配置排除到application.conf。也就是说,我想为每个WordCountWorker路由器提供相同的配置。但是,由于我不控制演员的名字,我不能使用下面的配置,因为我不知道演员的名字:

      /wordCountWorker{
        router = smallest-mailbox-pool
        nr-of-instances = 5
        dispatcher = word-counter-dispatcher
      }
    

    我不是Akka专家,但我认为每个聚合有一个参与者的方法并不是低效的。您需要以某种方式将并发聚合分开。你可以给每个聚合一个id,让它们在唯一的主actor中被id分开,或者你可以使用Akka actor命名和活循环逻辑,并将每个计数轮的每个聚合委托给一个只针对该聚合逻辑活的actor。

    对我来说,每个聚合使用一个参与者似乎更优雅。

    还请注意,Akka有一个这里描述的聚合模式的实现

    就我个人而言,我根本不会使用actor来解决这个聚合问题,但无论如何,这里。

    我不认为有一个合理的方法来处理多个目录同时字数统计的方式,你建议的。相反,你应该有一个"master-master"角色来监督计数器。所以,你有三个actor类:

    • FileCounter:它接收一个文件来读取并处理它。当它完成后,它将把结果返回给发送者。
    • CounterSupervisor:这个跟踪哪个FileCounter已经完成了他们的工作,并将结果发送回WordCountForker。
    • WordCountForker:这个参与者将跟踪哪个子系统完成了他们的任务,如果他们都很忙,创建一个新的CounterSupervisor来处理这个问题。

    文件计数器必须是最容易写的。

    class FileCounter() extends Actor with ActorLogging {
        import context.dispatcher
        override def preStart = {
            log.info("FileCounter Actor initialized")
        }
        def receive = {
            case CountFile(file) =>
                log.info("Counting file: " + file.getAbsolutePath)
                FileIO.readFile(file).foreach { data =>
                    val words = data
                        .split("n")
                        .map { _.split(" ").length }
                        .sum
                    context.parent ! FileCount(words)
                }
        }
    }
    

    现在是监督文件计数器的actor

    class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {
        var total = 0
        var files: Array[File] = _
        var pendingActors = 0
        override def preStart = {
            for(i <- 1 to actorPool)
                context.actorOf(FileCounter.props(), name = s"counter$i")
        }
        def receive = {
            case CountDirectory(base) =>
                log.info("Now counting starting from directory : " + base.getAbsolutePath)
                total = 0
                files = FileIO.getAllFiles(base)
                pendingActors = 0
                for(i <- 1 to actorPool if(i < files.length)) {
                    pendingActors += 1
                    context.child(s"counter$i").get ! CountFile(files.head)
                    files = files.tail
                }
            case FileCount(count) =>
                total += count
                pendingActors -= 1
                if(files.length > 0) {
                    sender() ! CountFile(files.head)
                    files = files.tail
                    pendingActors += 1
                } else if(pendingActors == 0) {
                    context.parent ! WordCountTotal(total)
                }
        }
    }
    

    然后是监督监制的演员。

    class WordCountForker(counterActors: Int) extends Actor with ActorLogging {
        var busyActors: List[(ActorRef, ActorRef)] = Nil
        var idleActors: List[ActorRef] = _
        override def preStart = {
            val first = context.actorOf(CounterSupervisor.props(counterActors))
            idleActors = List(first)
            log.info(s"Initialized first supervisor with $counterActors file counters.")
        }
        def receive = {
            case msg @ CountDirectory(dir) =>
                log.info("Count directory received")
                val counter = idleActors match {
                    case Nil =>
                        context.actorOf(CounterSupervisor.props(counterActors))
                    case head :: rest =>
                        idleActors = rest
                        head
                }
                counter ! msg
                busyActors = (counter, sender()) :: busyActors
            case msg @ WordCountTotal(n) =>
                val path = sender().path.toString()
                val index = busyActors.indexWhere { _._1.path.toString == path }
                val (counter, replyTo) = busyActors(index)
                replyTo ! msg
                idleActors = counter :: idleActors
                busyActors = busyActors.patch(index, Nil, 1)
        }
    }
    

    我留下了一些部分的答案,以保持它尽可能简洁,如果你想看到我发布了一个Gist的其余代码。

    另外,关于您对效率的关注,这里的解决方案将防止每个目录有一个子系统,但是如果有需要,您仍然会生成多个子系统。

    您应该在worker中使用成为/不成为功能。如果你的worker开始扫描大文件夹,使用become来改变actor的行为,忽略另一个消息(或响应不处理它),在目录扫描后发送消息与字数计数和unbecome到标准行为。

    首先。对于命名问题:只需动态且唯一地命名您的角色,就像这样:
    WorkerActor + "-" + filename…或…MasterActor + "-" + directoryName
    还是我错过了什么?

    其次,为什么要调度?当第一个目录完成后,开始处理下一个目录不是更合乎逻辑吗?如果日程安排是必要的,那么我看到了许多不同的解决方案来解决你的问题,我会尝试解决其中的一些:

    1。
    三级层次结构:
    MasterActor -> DirectoryActor -> WorkerActor
    为每个新目录创建一个新的目录actor,为每个文件创建一个新的worker。


    2。两级层次结构:
    MasterActor -> WorkerActor
    为每个文件创建一个新的worker。
    用于标识接收到的结果的两个选项:
    a)通过询问向工人分发工作,并通过期货汇总结果
    b)在作业中包含消息ID(例如目录名)


    3。具有负载平衡的两级层次结构:
    与选项2相同,但您不为每个文件创建新的worker,您有固定数量的worker,或者使用平衡调度程序,或者使用最小的邮箱路由器。


    4。带有期货的一级层次结构:
    大师级的演员没有孩子,他只工作,只考虑未来。

    我还建议阅读一下Gregor Raýman在他的回答中建议的Akka聚合模式。

    最新更新