我在我的 Akka 经典项目中使用以下代码片段。
(persistence ? Persist("a", Some(100), 123)) (100.milliseconds)
.mapTo[Persisted]
.recover(ex => Failure(ex.getMessage()))
.pipeTo(self)
当我处理此ask
的成功响应时,发件人self
。如何获取此ask
响应的发送者?换句话说,如何在此处获取目标/接收者参与者的地址?
编辑:
很明显,persistence
是询问的目标。但我真正想知道的是,是否有办法通过pipeTo
提供persistence
地址。假设有一系列persistence
,我不知道哪个Persist
消息来自哪个persistence
。
答案在你的问题中。persistence
是询问的目标。
persistence
可能会将工作委托给另一个参与者(例如池中的工作线程),但这会显示实现细节,并且 tbh 可能不会那么有用(这是 ask 模式不传播发送者的部分原因)。
如果它是你控制的协议,则向响应显式添加ActorRef
可以保证正常工作。
您可以滚动自己的 ask 模式版本来传播发送者,尽管如上所述,它可能不会那么有用。
编辑:要将persistence
传播到转发的回复中,最简单的方法是将询问的Future
结果map
为将persistence
与结果捆绑在一起的东西(作为一种相关ID),例如:
(persistence ? Persist("a", Some(100), 123)) (100.milliseconds)
.mapTo[Persisted]
.map { response => persistence -> response }
.recoverWith { ex => persistence -> Failure(ex.getMessage) }
.pipeTo(self)
发送的消息将是ActorRef
的元组,要么是Persisted
,要么是Failure
,所以你会在接收中将它们匹配成类似的东西
case (persistenceTarget, Persisted(...)) => ???
case (persistenceTarget, Failure(msg)) => ???
(你也可以显式地将协议更改为包含ActorRef
的东西:元组可能有点太原始了,但对于这个答案很方便,而无需了解有关协议的更多详细信息)
请注意,如果persistence
是Actor
中的一个字段,则在发送询问和执行map
/recoverWith
时之间可能会发生变化:map
/recoverWith
最终将拾取更改的值。 这种无意中"关闭"的演员状态是 Akka 中令人讨厌的错误的长期来源,因此可能值得拥有一个
val persistenceTarget = persistence
并将 ask/map
/recoverWith
中提及的persistence
替换为persistenceTarget
:由于persistenceTarget
是一个本地(且不可变)的值,因此关闭是安全的。