Akka流 Akka HTTP通过流程参数



我有以下代码段:

    case class SomeClass(param1:String,param2:String,param3:String)
    val someClassActorSource: Source[SomeClass, ActorRef] = Source
      .actorPublisher[SomeClass](Props[SomeClassActorPublisher])
    val someFlow: ActorRef = Flow[SomeClass]
        .mapAsync(3)(f=> getDocumentById(f))
        .map(f =>{
          val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
            .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
            )
          (request,request)
        }).via(connection)
        //Parsing Response
        .mapAsync(3){
          case (Success(HttpResponse(status, _, entity, _)),request)=>
            entity.dataBytes.runFold(ByteString(""))(_ ++ _)
        }
        .map(resp =>parse(resp.utf8String,?????????????) )
        .to(Sink.someSink{....})
        .runWith(someClassActorSource)
    def parse(resp:String,parseParam:String)=????

以及我要发送消息的代码中的某个地方:

someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")

我的问题是方法解析应使用原始案例类中的param2

因此,对于第一条消息,应该是

  parse(response,"b")

,对于第二个消息,应该是

  parse(response,"b1")

所以问题是,我如何从提交给流的方法中获取参数?

假设您的 connection值是通过

实例化的
val connection = Http().cachedHostConnectionPool(...)

您可以使用一个事实,即连接带有元组,而不是简单地将request传递在元组中,而是可以在输入的SomeClass中传递。此SomeClass实例必须遍历您的每个Flow值才能使其进入解析阶段。

稍微修改代码:

val getDocumentFlow = 
  Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))

您的问题不从getDocumentById陈述返回类型,因此我只是使用Document

val documentToRequest = 
  Flow[(Document, SomeClass)] map { case (document, someClass) =>
    val request = ...
    (request, someClass)
  }
val parseResponse = 
  Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
    case (Success(HttpResponse(status, _, entity, _)), someClass) =>
      entity
        .dataBytes
        .runFold(ByteString(""))(_ ++ _)
        .map(e => e -> someClass)
  }
val parseEntity = Flow[(ByteString, SomeClass)] map { 
  case (entity, someClass) => parse(entity.utf8String, someClass)
}

然后可以按照问题所述使用这些流量:

val someFlow = 
  someClassActorSource
    .via(getDocumentFlow)
    .via(documentToRequest)
    .via(connection)
    .via(parseResponse)
    .via(parseEntity)
    .to(Sink.someSink{...})
    .run()

最新更新