我正在玩 Lagom 并创建了接收源作为输入并返回案例类对象的服务:
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall}
import play.api.libs.json.{Format, Json}
trait TestService extends Service {
def test(): ServiceCall[Source[String, NotUsed], ResultData]
override final def descriptor = {
import Service._
named("DocsStore")
.withCalls(
call(test())
)
}
}
case class ResultData(uploadId: String, length: Long)
object ResultData {
implicit val format: Format[ResultData] = Json.format[ResultData]
}
服务实现为:
class TestServiceImpl()(
implicit val materializer: Materializer,
implicit val ec: ExecutionContext
) extends TestService {
val logger = Logger(getClass.getName)
override def test(): ServiceCall[Source[String, NotUsed], ResultData] = ServiceCall{ source=>
source.runForeach(s=>logger.info(s"String $s")).map(_=>ResultData("TestResult", 12))
}
}
当我从 Play 应用程序的控制器调用此服务时:
def test = Action.async { req=>
testService.test().invoke(Source("A"::"B"::"C"::Nil)).map(rd=>Ok(Json.toJson(rd)))
}
服务端的"runForeach"成功打印A,B,C,但服务本身不返回任何结果值(预期为ResultData("TestResult",12((导致Play应用程序抛出异常:
play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[DeserializationException: No content to map due to end-of-input
at [Source: akka.util.ByteIterator$ByteArrayIterator$$anon$1@309c63af; line: 1, column: 0]]]
at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188)
at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346)
at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
如何解决这个问题?
发生这种情况是因为 Lagom 将流的完成解释为关闭连接的信号。在发送响应之前,连接已关闭。
这在 GitHub 中作为一个问题提出:https://github.com/lagom/lagom/issues/814
一种可能的解决方法是在收到响应之前保持流打开状态,如有关测试流服务的文档所示:
// Use a source that never terminates (concat Source.maybe) so we
// don't close the upstream, which would close the downstream
val input = Source("A"::"B"::"C"::Nil).concat(Source.maybe)
但是,如果使用此策略,则还需要更改服务实现,因为上述问题中的实现仅在流完成时发送响应。相反,您需要在协议中设计一条显式完成消息,该消息向服务发送响应。