阿卡流集团只有权利的元素任一



我有一个发射Either[String, MyClass]的源。

我想用一批MyClass调用一个外部服务,并用Either[String, ExternalServiceResponse]继续下行,这就是为什么我需要对流的元素进行分组。

如果流只发出MyClass元素,那就很容易了——只需调用grouped:

val source: Source[MyClass, NotUsed] = <custom implementation>
source
.grouped(10)                 // Seq[MyClass]
.map(callExternalService(_)) // ExternalServiceResponse

但是,在我的场景中,如何只对"非此即彼"右侧的元素进行分组?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.???                                                      // Either[String, Seq[MyClass]]
.map {
case Right(myClasses) => callExternalService(myClasses)
case Left(string) => Left(string)
}                                                         // Either[String, ExternalServiceResponse]

下面的方法有效,但还有更惯用的方法吗?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
.groupBy(2, either => either.isRight)
.grouped(10)
.map(input => input.headOption match {
case Some(Right(_)) =>
callExternalService(input.map(item => item.right.get))
case _ =>
input
})
.mapConcat(_.to[scala.collection.immutable.Iterable])
.mergeSubstreams

这应该将Either[L, R]的源转换为具有Rights的可配置分组的Either[L, Seq[R]]的源。

def groupRights[L, R](groupSize: Int)(in: Source[Either[L, R], NotUsed]): Source[Either[L, Seq[R]], NotUsed] =
in.map(Option _)  // Yep, an Option[Either[L, R]]
.concat(Source.single(None)) // to emit when `in` completes
.statefulMapConcat { () =>
val buffer = new scala.collection.mutable.ArrayBuffer[R](groupSize)
def dumpBuffer(): List[Either[L, Seq[R]] = {
val out = List(Right(buffer.toList))
buffer.clear()
out
}
incoming: Option[Either[L,R]] => {
incoming.map { _.fold(
l => List(Left(l)),  // unfortunate that we have to re-wrap
r => {
buffer += r
if (buffer.size == groupSize) {
dumpBuffer()
} else {
Nil
}
}
)
}.getOrElse(dumpBuffer()) // End of stream
}
}

除此之外,我将注意到,调用外部服务的下游代码可以重写为

.map(_.right.map(callExternalService))

如果您可以可靠地使用并行n调用外部服务,那么使用也可能值得这样做

.mapAsync(n) { e.fold(
l => Future.successful(Left(l)),
r => Future { Right(callExternalService(r)) }
)
}

如果您想以维护订单为代价最大限度地提高吞吐量,您甚至可以用mapAsyncUnordered替换mapAsync

您可以将either的来源划分为两个分支,以便以自己的方式处理权限,然后合并回两个子流:

// case class MyClass(x: Int)
// case class ExternalServiceResponse(xs: Seq[MyClass])
// def callExternalService(xs: Seq[MyClass]): ExternalServiceResponse =
//    ExternalServiceResponse(xs)
// val source: Source[Either[String, MyClass], _] =
//   Source(List(Right(MyClass(1)), Left("2"), Right(MyClass(3)), Left("4"), Right(MyClass(5))))
val lefts: Source[Either[String, Nothing], _] =
source
.collect { case Left(l) => Left(l) }
val rights: Source[Either[Nothing, ExternalServiceResponse], _] =
source
.collect { case Right(x: MyClass) => x }
.grouped(2)
.map(callExternalService)
.map(Right(_))
val out: Source[Either[String, ExternalServiceResponse], _] = rights.merge(lefts)
// out.runForeach(println)
// Left(2)
// Right(ExternalServiceResponse(Vector(MyClass(1), MyClass(3))))
// Left(4)
// Right(ExternalServiceResponse(Vector(MyClass(5))))

最新更新