Akka http 请求和响应模式



我有一个要求,客户端调用通过akka http创建的postREST端点。一旦请求在 post 方法中,我需要将 post 对象传递给流(由源、多个流和接收器等组成(并从接收器中取回响应,以便我可以将响应返回给客户端。

我一直在浏览一些文章并看到以下代码,但担心我不想为每个请求具体化流。我只想具体化一个流,并继续将元素传递给该流。

以下是我所看到的高水平:

val route: Route =
path("dummy path") { p =>
get {
(extract(_.request) & extractMaterializer) { (req, mat) ⇒
**Source.single(req).runWith(sink)(mat)**
complete {
s"<h1>Say hello to akka-http. p=$p</h1>"
}
}
}
}

我正在考虑创建一个Actor并将对象传递给该Actor。我可以从 Source.actorRef 创建一个源,并将多个流与此源连接起来。但我不确定,如何从水槽中取回响应。像这样:

val actor: ActorRef = some actor
Source.actorRef(actor).via(flows).to(Sink).run() --> materialized stream
val route: akka.http.scaladsl.server.Route =
path("post" / Segment) { p =>
post {
(extract(_.request) & extractMaterializer) { (req, mat) ⇒
response = actor.ask(message) --> get back the response
complete {
response
}
}
}
}

或者,我是否可以在我的用例中加入其他内容。

我想你想要的是使请求的处理流经流[仅具体化一次],并将响应从流发送回用户。可能是队列源,介于两者之间的Actor可以完成这项工作

import java.util.concurrent.TimeUnit
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{
get,
onSuccess,
pathEnd,
pathPrefix
}
import akka.pattern.ask
import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete}
import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult}
import akka.util.Timeout
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import com.typesafe.config.ConfigFactory
import scala.concurrent.ExecutionContext
object TestApp2 extends App {
implicit val actorSystem = ActorSystem("test-system")
implicit val mat = ActorMaterializer()
implicit val ec = mat.executionContext
val streamSource = Source
.queue[(Message, ActorRef)](100, OverflowStrategy.dropNew)
.map { p =>
//do anything here
println("I am processing request")
("It works", p._2)
}
.toMat(Sink.foreach { resp =>
resp._2 ! resp._1
})(Keep.left)
.run()
implicit val timeout = Timeout(
10000,
TimeUnit.MILLISECONDS
)
val internalActor =
actorSystem.actorOf(Props(new InternalActor(streamSource)))
Http(actorSystem)
.bindAndHandle(
getRoutes(internalActor),
"0.0.0.0",
8080
)
def getRoutes(
internalActor: ActorRef
)(implicit mat: ActorMaterializer, ec: ExecutionContext, timeout: Timeout) = {
pathPrefix("healthcheck") {
get {
pathEnd {
val responseReturned = internalActor ? Message()
onSuccess(responseReturned) {
case response: String =>
complete(response)
case _ => complete("error")
}
}
}
}
}
}
case class Message()
class InternalActor(streamSource: SourceQueueWithComplete[(Message, ActorRef)])(
implicit ec: ExecutionContext
) extends Actor {
override def receive: Receive = {
case m: Message =>
val senderRef = sender()
streamSource.offer((m, senderRef)).map {
case QueueOfferResult.Enqueued => // do nothing for success
case QueueOfferResult.Dropped => senderRef ! "error" // return error in case of backpressure 
case QueueOfferResult.Failure(ex) => senderRef ! "error" // return error
case QueueOfferResult.QueueClosed => senderRef ! "error" // return error
}
}
}

卷曲"http://localhost:8080/healthcheck">

它有效

最新更新