合并最新测试的默认值



MergeLatest的官方文件指出:

MergeLatest 为从某个输入流发出的每个元素发出列表,但仅在每个输入流发出至少一个元素之后。

我的问题是:这可以绕过吗? 例如,我们是否可以提供一个默认值,以便一旦它从任何输入流中接收到至少一个元素,它就会开始生成列表?

以下应该是新行为:

(1,0,0)
(2,0,0)
(2,1,0)
(2,1,1)
(2,1,2)

而不是:

(2,1,1)
(2,1,2)

因为我也需要将第一个列表推送到输出流

不幸的是,mergeLatest没有提供这样的选项。 而且似乎没有任何 Stream 运算符可以轻易做到这一点。 一种方法是将MergeLatest重新用于特定需求。 好消息是,必要的代码更改相当简单,因为相关的代码实现是UniformFanInShape的标准GraphStage

import akka.stream.scaladsl._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.{ Attributes, Inlet, Outlet, UniformFanInShape }
import scala.collection.immutable
object MergeLatestWithDefault {
def apply[T](inputPorts: Int, default: T, eagerComplete: Boolean = false): GraphStage[UniformFanInShape[T, List[T]]] =
new MergeLatestWithDefault[T, List[T]](inputPorts, default, eagerComplete)(_.toList)
}
final class MergeLatestWithDefault[T, M](val inputPorts: Int, val default: T, val eagerClose: Boolean)(buildElem: Array[T] => M)
extends GraphStage[UniformFanInShape[T, M]] {
require(inputPorts >= 1, "input ports must be >= 1")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeLatestWithDefault.in" + i))
val out: Outlet[M] = Outlet[M]("MergeLatestWithDefault.out")
override val shape: UniformFanInShape[T, M] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private val activeStreams: java.util.HashSet[Int] = new java.util.HashSet[Int]()
private var runningUpstreams: Int = inputPorts
private def upstreamsClosed: Boolean = runningUpstreams == 0
private val messages: Array[Any] = Array.fill[Any](inputPorts)(default)
override def preStart(): Unit = in.foreach(tryPull)
in.zipWithIndex.foreach {
case (input, index) =>
setHandler(
input,
new InHandler {
override def onPush(): Unit = {
messages.update(index, grab(input))
activeStreams.add(index)
emit(out, buildElem(messages.asInstanceOf[Array[T]]))
tryPull(input)
}
override def onUpstreamFinish(): Unit = {
if (!eagerClose) {
runningUpstreams -= 1
if (upstreamsClosed) completeStage()
} else completeStage()
}
})
}
override def onPull(): Unit = {
var i = 0
while (i < inputPorts) {
if (!hasBeenPulled(in(i))) tryPull(in(i))
i += 1
}
}
setHandler(out, this)
}
override def toString = "MergeLatestWithDefault"
}

在这种情况下,只需要很少的代码更改。 除了在数组messages中填充default的附加参数外,唯一的变化是onPush内的emit不再是有条件的。

测试一下:

import akka.actor.ActorSystem
object CustomMerge {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("system")
val s1 = Source(1 to 3)
val s2 = Source(11 to 13).throttle(1, 50.millis)
val s3 = Source(101 to 103).throttle(1, 100.millis)
Source.combine(s1, s2, s3)(MergeLatestWithDefault[Int](_, 0)).runForeach(println)
}
}
// Output:
//
// List(1, 0, 0)
// List(1, 11, 0)
// List(1, 11, 101)
// List(2, 11, 101)
// List(2, 12, 101)
// List(3, 12, 101)
// List(3, 13, 101)
// List(3, 13, 102)
// List(3, 13, 103)

作为奖励,虽然mergeLatest仅在 Akka Stream2.6+ 上可用,但根据我的简短测试,这个重新利用的代码似乎在2.5上运行良好。

对于每个传入流,您可以使用Source.single(0).concat发出零,然后发出流的其余部分:

def withInitialValue[A](source: Source[A, NotUsed], a: A): Source[A, NotUsed] =
Source.single(a).concat(source)

最新更新