当使用参与者互操作从 s3 流式传输文件时如何处理背压



>我正在尝试从 S3 下载一个大文件并将其数据发送给正在执行 http 请求的另一个参与者,然后保留响应。我想限制该参与者发送的请求数量,因此我需要处理背压.
我尝试做这样的事情:

S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.mapAsync(1){
case b@Buzz(_, _) =>
(persistActor ? b).mapTo[Done]
}.runWith(Sink.head)

问题是我看到它只从文件中读取 30 行作为并行性的限制。我不确定这是实现我正在寻找的目标的正确方法

正如 Johny 在他的评论中指出的那样,Sink.head是导致流仅处理大约 30 个元素的原因。 发生的情况大致如下:

  • Sink.head表示对 1 个元素的需求
  • 这种需求通过第二个mapAsync向上传播
  • 当需求达到第一个mapAsync时,由于该元素具有并行度 30,因此表示对 30 个元素的需求
  • CSV 解析阶段发出 30 个元素
  • 当收到来自客户端参与者的第一个元素的对 Ask 的响应时,响应将向下传播到持久参与者的请求
  • 从 CSV 解析阶段发出对另一个元素的需求信号
  • 当持久执行组件响应时,响应将转到接收器
  • 由于接收器是Sink.head的,一旦收到元素就会取消流,因此流会被拆除
  • 客户端参与者的任何已发送但正在等待响应的请求仍将得到处理

持久执行组件的响应与 CSV 解析并向客户端执行组件发送请求之间存在一些竞争:如果后者更快,则客户端执行组件可能会处理 31 行。

如果您只想在每个元素处理完毕后进行Future[Done]Sink.last将很好地使用此代码。

如果原因不是我在评论中提到的Sink.head的使用,您可以使用Sink.actorRefWithBackpressure对流进行背压。

示例代码:

class PersistActor extends Actor {
override def receive: Receive = {
case "init" =>
println("Initialized")
case "complete" =>
context.stop(self)
case message =>
//Persist Buzz??
sender() ! Done
}
}
val sink = Sink
.actorRefWithBackpressure(persistActor, "init", Done, "complete", PartialFunction.empty)
S3.download(bckt, bcktKey).map{
case Some((file, _)) =>
file
.via(CsvParsing.lineScanner())
.map(_.map(_.utf8String)).drop(1)//drop headers
.map(p => Foo(p.head, p(1)))
//You could backpressure here too...
.mapAsync(30) { p =>
implicit val askTimeout: Timeout = Timeout(10 seconds)
(httpClientActor ? p).mapTo[Buzz]
}
.to(sink)
.run()

最新更新