我正在尝试在akka/scala中写下一个调用HTTP REST API的演员,并将结果发送回到调用演员。API可能会返回必须首先将其转换为内部供应商中性格式的结果收集/向量,以便将来可以更改供应商,而无需进行代码上的太多更改。大多数代码都在起作用,但我不确定如何解开包装并发送内部向量。
这是我拥有的代码,它将Promise
返回到调用演员。我想返回的是最终map
操作中创建的实际向量:
class RESTActor extends Actor with ActorLogging with JsonSupport {
final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
import akka.pattern.pipe
import context.dispatcher
override def receive: Receive = {
case query: String => {
val requester = sender
var uri = Uri(Settings.autoCompleteURL).withQuery(Query(Map("query" -> query)))
sender! http
.singleRequest(HttpRequest(HttpMethods.GET, uri = uri))
.flatMap(response =>
response.status match {
case status if status.isSuccess() => Unmarshal(response.entity).to[VendorResponse].map(_.result.map(x => VendorNeutralResponse(x.id, x.field)))
case _ => response.entity.toStrict(5.seconds).map { entity =>
val body = entity.data.decodeString("UTF-8")
log.warning(errorMessage(response, body))
Left(errorMessage(response, body))
}
})
}
}
呼叫演员:
pathPrefix("search") {
get {
parameter("query") { query =>
{
complete(restActor.ask(query)) //Doesn't work as the reply is a promise
}
}
}
}
如何更改RECTACTOR中的上述代码以将实际结果发送回去,而不是将来或承诺?
edit :根据我自己的研究和 @michał和 @cyrille-corpet的建议更改代码后,以下代码有效:
pathPrefix("search") {
get {
parameter("query") { query =>
{
onComplete(gisRouter.ask(query)) {
case Success(resp: Future[Vector[VendorNeutralResponse]]) => {
resp.map(println)
complete("ok")
}
case Failure(e) => complete(e.toString)
}
}
}
}
}
看来我仍在获得future
作为演员的回应。我如何让演员使用实际数据而不是Future
?
您可以使用onComplete
功能,该功能将Future[T]
作为输入并返回Directive1[Try[T]]
,因此您可以按以下方式使用:
pathPrefix("search") {
get {
parameter("query") { query =>
onComplete(restActor.ask(query)) {
case Success(resp) => ...
case Failure(e) => ...
}
}
}
}
edit 关于从您的演员返回,您应该将http.singleRequest
的结果输送到发件人,而不是告诉它:
http.singleRequest(...).flatMap(...) pipeTo requester
这样,只有在解决Future
后才完成实际的告诉(!
(。
您可以使用地图类似于monadic:
pathPrefix("search") {
get {
parameter("query") { query =>
restActor.ask(query) map {
case Success(resp) => ...
case Failure(e) => ...
}
}
}
}
编辑:当前您的演员会响应未来。尝试对其进行重构,以便返回未包装的值:
class RESTActor extends Actor with ActorLogging with JsonSupport {
final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system))
val http = Http(context.system)
import akka.pattern.pipe
import context.dispatcher
private def handleHttpResponse = {
case status if status.isSuccess() => Unmarshal(response.entity).to[VendorResponse].map(_.result.map(x => VendorNeutralResponse(x.id, x.field)))
case _ => response.entity.toStrict(5.seconds).map { entity =>
val body = entity.data.decodeString("UTF-8")
log.warning(errorMessage(response, body))
Left(errorMessage(response, body))
}
}
override def receive: Receive = {
case query: String => {
val requester = sender
var uri = Uri(Settings.autoCompleteURL).withQuery(Query(Map("query" -> query)))
http.singleRequest(HttpRequest(HttpMethods.GET, uri = uri)).flatMap(response =>
response.status map handleHttpResponse ) pipeTo requester
}
}