考虑经典的"Word Count"程序。它计算某个目录中所有文件的字数。Master接收一些目录,并在Worker参与者之间分割作业(每个Worker参与者处理一个文件)。这是伪代码:
class WordCountWorker extends Actor {
def receive = {
case FileToCount(fileName:String) =>
val count = countWords(fileName)
sender ! WordCount(fileName, count)
}
}
class WordCountMaster extends Actor {
def receive = {
case StartCounting(docRoot) => // sending each file to worker
val workers = createWorkers()
fileNames = scanFiles(docRoot)
sendToWorkers(fileNames, workers)
case WordCount(fileName, count) => // aggregating results
...
}
}
但是我想按时间表运行这个单词计数程序(例如每1分钟),提供不同的目录来扫描。
和Akka提供了一个很好的方式来调度消息传递:
system.scheduler.schedule(0.seconds, 1.minute, wordCountMaster , StartCounting(directoryName))
但是上述调度程序的问题开始于调度程序按tick发送新消息,但之前的消息尚未处理(例如我发送消息扫描某个大目录,1秒后我发送另一个消息扫描另一个目录,因此第一个目录的处理操作尚未完成)。因此,我的WordCountMaster
将从正在处理不同目录的工作程序接收WordCount
消息。
作为一个解决方案,而不是调度消息发送,我可以调度一些代码块的执行,这将创建每次新的 WordCountMaster
。即一个目录=一个WordCountMaster
。但我认为这是低效的,而且我需要注意为WordCountMaster
提供唯一的名称,以避免InvalidActorNameException
。
WordCountMaster
,我在上面的段落中提到的?或者有更好的想法/模式如何重新设计这个程序来支持调度?一些更新:在每个目录创建一个Master actor的情况下,我遇到了一些问题:
- 演员命名问题
InvalidActorNameException:演员名[WordCountMaster]不唯一!
和
InvalidActorNameException: actor名称[WordCountWorker]不存在独一无二的!
我可以克服这个问题,只是不提供演员的名字。但在这种情况下,我的演员接收自动生成的名称,如$a
, $b
等。这对我不好。
- 配置错误:
我想把路由器的配置排除到application.conf
。也就是说,我想为每个WordCountWorker
路由器提供相同的配置。但是,由于我不控制演员的名字,我不能使用下面的配置,因为我不知道演员的名字:
/wordCountWorker{
router = smallest-mailbox-pool
nr-of-instances = 5
dispatcher = word-counter-dispatcher
}
我不是Akka专家,但我认为每个聚合有一个参与者的方法并不是低效的。您需要以某种方式将并发聚合分开。你可以给每个聚合一个id,让它们在唯一的主actor中被id分开,或者你可以使用Akka actor命名和活循环逻辑,并将每个计数轮的每个聚合委托给一个只针对该聚合逻辑活的actor。
对我来说,每个聚合使用一个参与者似乎更优雅。
还请注意,Akka有一个这里描述的聚合模式的实现
就我个人而言,我根本不会使用actor来解决这个聚合问题,但无论如何,这里。
我不认为有一个合理的方法来处理多个目录同时字数统计的方式,你建议的。相反,你应该有一个"master-master"角色来监督计数器。所以,你有三个actor类:
- FileCounter:它接收一个文件来读取并处理它。当它完成后,它将把结果返回给发送者。
- CounterSupervisor:这个跟踪哪个FileCounter已经完成了他们的工作,并将结果发送回WordCountForker。
- WordCountForker:这个参与者将跟踪哪个子系统完成了他们的任务,如果他们都很忙,创建一个新的CounterSupervisor来处理这个问题。
文件计数器必须是最容易写的。
class FileCounter() extends Actor with ActorLogging {
import context.dispatcher
override def preStart = {
log.info("FileCounter Actor initialized")
}
def receive = {
case CountFile(file) =>
log.info("Counting file: " + file.getAbsolutePath)
FileIO.readFile(file).foreach { data =>
val words = data
.split("n")
.map { _.split(" ").length }
.sum
context.parent ! FileCount(words)
}
}
}
现在是监督文件计数器的actor
class CounterSupervisor(actorPool: Int) extends Actor with ActorLogging {
var total = 0
var files: Array[File] = _
var pendingActors = 0
override def preStart = {
for(i <- 1 to actorPool)
context.actorOf(FileCounter.props(), name = s"counter$i")
}
def receive = {
case CountDirectory(base) =>
log.info("Now counting starting from directory : " + base.getAbsolutePath)
total = 0
files = FileIO.getAllFiles(base)
pendingActors = 0
for(i <- 1 to actorPool if(i < files.length)) {
pendingActors += 1
context.child(s"counter$i").get ! CountFile(files.head)
files = files.tail
}
case FileCount(count) =>
total += count
pendingActors -= 1
if(files.length > 0) {
sender() ! CountFile(files.head)
files = files.tail
pendingActors += 1
} else if(pendingActors == 0) {
context.parent ! WordCountTotal(total)
}
}
}
然后是监督监制的演员。
class WordCountForker(counterActors: Int) extends Actor with ActorLogging {
var busyActors: List[(ActorRef, ActorRef)] = Nil
var idleActors: List[ActorRef] = _
override def preStart = {
val first = context.actorOf(CounterSupervisor.props(counterActors))
idleActors = List(first)
log.info(s"Initialized first supervisor with $counterActors file counters.")
}
def receive = {
case msg @ CountDirectory(dir) =>
log.info("Count directory received")
val counter = idleActors match {
case Nil =>
context.actorOf(CounterSupervisor.props(counterActors))
case head :: rest =>
idleActors = rest
head
}
counter ! msg
busyActors = (counter, sender()) :: busyActors
case msg @ WordCountTotal(n) =>
val path = sender().path.toString()
val index = busyActors.indexWhere { _._1.path.toString == path }
val (counter, replyTo) = busyActors(index)
replyTo ! msg
idleActors = counter :: idleActors
busyActors = busyActors.patch(index, Nil, 1)
}
}
我留下了一些部分的答案,以保持它尽可能简洁,如果你想看到我发布了一个Gist的其余代码。
另外,关于您对效率的关注,这里的解决方案将防止每个目录有一个子系统,但是如果有需要,您仍然会生成多个子系统。
您应该在worker中使用成为/不成为功能。如果你的worker开始扫描大文件夹,使用become
来改变actor的行为,忽略另一个消息(或响应不处理它),在目录扫描后发送消息与字数计数和unbecome
到标准行为。
首先。对于命名问题:只需动态且唯一地命名您的角色,就像这样:
WorkerActor + "-" + filename…或…MasterActor + "-" + directoryName
还是我错过了什么?
其次,为什么要调度?当第一个目录完成后,开始处理下一个目录不是更合乎逻辑吗?如果日程安排是必要的,那么我看到了许多不同的解决方案来解决你的问题,我会尝试解决其中的一些:
1。
三级层次结构:
MasterActor -> DirectoryActor -> WorkerActor
为每个新目录创建一个新的目录actor,为每个文件创建一个新的worker。
2。两级层次结构:
MasterActor -> WorkerActor
为每个文件创建一个新的worker。
用于标识接收到的结果的两个选项:
a)通过询问向工人分发工作,并通过期货汇总结果
b)在作业中包含消息ID(例如目录名)
3。具有负载平衡的两级层次结构:
与选项2相同,但您不为每个文件创建新的worker,您有固定数量的worker,或者使用平衡调度程序,或者使用最小的邮箱路由器。
4。带有期货的一级层次结构:
大师级的演员没有孩子,他只工作,只考虑未来。
我还建议阅读一下Gregor Raýman在他的回答中建议的Akka聚合模式。