如何将mapActors变量设置为MasterActor类中接收函数之外的全局变量?
f1
和f2
函数从MapReduceApp发送到MastorActor。
另一种方法是在MastorActor中的receive函数之前创建mapActors。然后,当稍后从MapReduceApp接收到f1
/f2
函数时,f1
和f2
可以分别发送到mapActors和reduceActions。但是如何做到这一点呢?将f1
和f2
函数传递到mapActor和reduceActor中似乎很头疼。
class MasterActor extends Actor {
var reduceActors = List[ActorRef]()
var mapActors: ActorRef = ???
def receive = {
case SetMapperReducer(f1, f2, data) =>
for (i <- 0 until 4) {
reduceActors = context.actorOf(
Props(classOf[ReduceActor], f2), name = "reduce" + i
) :: reduceActors
}
mapActors = context.actorOf(
RoundRobinPool(4).props(
Props(classOf[MapActor], reduceActors, f1)
)
)
case Flush =>
mapActors ! Broadcast(Flush)
}
map和reduce有两个类。
地图演员:
class MapActor(
reduceActors: List[ActorRef],
mf: Any => List[Tuple2[Any,Any]]
) extends Actor {
def receive = {
case MapData(data: Any) =>
println(data)
case Flush =>
println("Got flush")
}
}
class ReduceActor(rf: Any => List[Tuple2[Any,Any]]) extends Actor {
def receive = {
case Data(data: Any) =>
println(data)
case Flush =>
println("Got flush")
}
还有一个主要程序:
object MapReduceApp extends App {
val system = ActorSystem("TestApp")
val master = system.actorOf(Props[MasterActor](), name = "master")
val data = List(
("Episode 1", "Once upon a time, in a land far, far away...")
)
master ! SetMapperReducer(mapProcess1 _, reduceProcess1 _, data)
}
不要在Actors中使用var
,使用context.become
来处理状态。类似这样的东西:
class MasterActor extends Actor {
def receive = receiveMain(Nil, None)
def receiveMain(
reduceActors: List[ActorRef],
mapActor: Option[ActorRef],
): Receive = {
case SetMapperReducer(f1, f2, data) =>
val reduceActors =
(0 until 4).map { i =>
context.actorOf(
Props(classOf[ReduceActor], f2),
name = "reduce" + i
)
}.toList
val mapActors = context.actorOf(
RoundRobinPool(4).props(
Props(classOf[MapActor], reduceActors, f1)
)
)
context.become(reduceActors, Option(mapActors))
case Flush =>
mapActors.foreach(_ ! Broadcast(Flush))
}
}
似乎最初为mapActors变量创建一个TestActor类就可以了。然后,可以在案例中对其进行更改。
主要演员:
...
var mapActors = context.actorOf(
RoundRobinPool(numberMappers).props(
Props(classOf[TestActor])
)
)
def receive = {
case SetMapperReducer(f1, f2, data) =>
...
}
...
然后只创建临时TestActor类:
class TestActor() extends Actor {
def receive = {
case Flush =>
println("hi")
}
}