Lagom 服务在接收源时没有响应



我正在玩 Lagom 并创建了接收源作为输入并返回案例类对象的服务:

import akka.NotUsed
import akka.stream.scaladsl.Source
import com.lightbend.lagom.scaladsl.api.{Service, ServiceCall}
import play.api.libs.json.{Format, Json}
trait TestService extends Service {
  def test(): ServiceCall[Source[String, NotUsed], ResultData]
  override final def descriptor = {
    import Service._
    named("DocsStore")
      .withCalls(
        call(test())
      )
  }
}

case class ResultData(uploadId: String, length: Long)
object ResultData {
  implicit val format: Format[ResultData] = Json.format[ResultData]
}

服务实现为:

class TestServiceImpl()(
  implicit val materializer: Materializer,
  implicit val ec: ExecutionContext
) extends TestService {
  val logger = Logger(getClass.getName)
  override def test(): ServiceCall[Source[String, NotUsed], ResultData] = ServiceCall{ source=>
    source.runForeach(s=>logger.info(s"String $s")).map(_=>ResultData("TestResult", 12))
  }
}

当我从 Play 应用程序的控制器调用此服务时:

  def test = Action.async { req=>
    testService.test().invoke(Source("A"::"B"::"C"::Nil)).map(rd=>Ok(Json.toJson(rd)))
  }
服务

端的"runForeach"成功打印A,B,C,但服务本身不返回任何结果值(预期为ResultData("TestResult",12((导致Play应用程序抛出异常:

play.api.http.HttpErrorHandlerExceptions$$anon$1: Execution exception[[DeserializationException: No content to map due to end-of-input
 at [Source: akka.util.ByteIterator$ByteArrayIterator$$anon$1@309c63af; line: 1, column: 0]]]
    at play.api.http.HttpErrorHandlerExceptions$.throwableToUsefulException(HttpErrorHandler.scala:293)
    at play.api.http.DefaultHttpErrorHandler.onServerError(HttpErrorHandler.scala:220)
    at play.api.GlobalSettings$class.onError(GlobalSettings.scala:160)
    at play.api.DefaultGlobal$.onError(GlobalSettings.scala:188)
    at play.api.http.GlobalSettingsHttpErrorHandler.onServerError(HttpErrorHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:100)
    at play.core.server.netty.PlayRequestHandler$$anonfun$2$$anonfun$apply$1.applyOrElse(PlayRequestHandler.scala:99)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:346)
    at scala.concurrent.Future$$anonfun$recoverWith$1.apply(Future.scala:345)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

如何解决这个问题?

发生这种情况是因为 Lagom 将流的完成解释为关闭连接的信号。在发送响应之前,连接已关闭。

这在 GitHub 中作为一个问题提出:https://github.com/lagom/lagom/issues/814

一种可能的解决方法是在收到响应之前保持流打开状态,如有关测试流服务的文档所示:

// Use a source that never terminates (concat Source.maybe) so we
// don't close the upstream, which would close the downstream
val input = Source("A"::"B"::"C"::Nil).concat(Source.maybe)

但是,如果使用此策略,则还需要更改服务实现,因为上述问题中的实现仅在流完成时发送响应。相反,您需要在协议中设计一条显式完成消息,该消息向服务发送响应。

相关内容

  • 没有找到相关文章

最新更新