我有以下代码段:
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????) )
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
以及我要发送消息的代码中的某个地方:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
我的问题是方法解析应使用原始案例类中的param2
因此,对于第一条消息,应该是
parse(response,"b")
,对于第二个消息,应该是
parse(response,"b1")
所以问题是,我如何从提交给流的方法中获取参数?
假设您的 connection
值是通过
val connection = Http().cachedHostConnectionPool(...)
您可以使用一个事实,即连接带有元组,而不是简单地将request
传递在元组中,而是可以在输入的SomeClass
中传递。此SomeClass
实例必须遍历您的每个Flow
值才能使其进入解析阶段。
稍微修改代码:
val getDocumentFlow =
Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))
您的问题不从getDocumentById
陈述返回类型,因此我只是使用Document
:
val documentToRequest =
Flow[(Document, SomeClass)] map { case (document, someClass) =>
val request = ...
(request, someClass)
}
val parseResponse =
Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)), someClass) =>
entity
.dataBytes
.runFold(ByteString(""))(_ ++ _)
.map(e => e -> someClass)
}
val parseEntity = Flow[(ByteString, SomeClass)] map {
case (entity, someClass) => parse(entity.utf8String, someClass)
}
然后可以按照问题所述使用这些流量:
val someFlow =
someClassActorSource
.via(getDocumentFlow)
.via(documentToRequest)
.via(connection)
.via(parseResponse)
.via(parseEntity)
.to(Sink.someSink{...})
.run()