使用scalaz-stream是否可以拆分/分叉然后重新加入流?
举个例子,假设我有以下函数
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val sumOfEvenNumbers = streamOfNumbers.filter(isEven).fold(0)(add)
val sumOfOddNumbers = streamOfNumbers.filter(isOdd).fold(0)(add)
zip( sumOfEven, sumOfOdd ).to( someEffectfulFunction )
对于 scalaz-stream,在此示例中,结果将如您所期望的那样 - 从 1 到 10 的数字元组传递到接收器。
但是,如果我们用需要 IO 的东西替换streamOfNumbers
,它实际上会执行 IO 操作两次。
使用Topic
我可以创建一个发布/订阅进程,该进程可以正确复制流中的元素,但是它不会缓冲 - 它只是尽可能快地消耗整个源,而不管下沉消耗的速度如何。
我可以将其包装在有界队列中,但是最终结果感觉比需要的要复杂得多。
有没有更简单的方法可以在 scalaz-stream 中拆分流,而无需从源中重复 IO 操作?
还要澄清前面的答案 delas 与"拆分"要求。特定问题的解决方案可能不需要拆分流:
val streamOfNumbers : Process[Task,Int] = Process.emitAll(1 to 10)
val oddOrEven: Process[Task,Int/Int] = streamOfNumbers.map {
case even if even % 2 == 0 => right(even)
case odd => left(odd)
}
val summed = oddOrEven.pipeW(sump1).pipeO(sump1)
val evenSink: Sink[Task,Int] = ???
val oddSink: Sink[Task,Int] = ???
summed
.drainW(evenSink)
.to(oddSink)
您也许仍然可以使用 topic,并确保子进程在您推送到主题之前订阅。
但是请注意,此解决方案没有任何限制,即如果您推得太快,您可能会遇到 OOM 错误。
def split[A](source:Process[Task,A]): Process[Task,(Process[Task,A], Proces[Task,A])]] = {
val topic = async.topic[A]
val sub1 = topic.subscribe
val sub2 = topic.subscribe
merge.mergeN(Process(emit(sub1->sub2),(source to topic.publish).drain))
}
我同样需要这个功能。我的情况相当棘手,不允许我以这种方式解决它。
感谢丹尼尔·斯皮瓦克(Daniel Spiewak)在此线程中的回应,我能够使以下内容正常工作。我通过添加onHalt
改进了他的解决方案,因此一旦Process
完成,我的应用程序就会退出。
def split[A](p: Process[Task, A], limit: Int = 10): Process[Task, (Process[Task, A], Process[Task, A])] = {
val left = async.boundedQueue[A](limit)
val right = async.boundedQueue[A](limit)
val enqueue = p.observe(left.enqueue).observe(right.enqueue).drain.onHalt { cause =>
Process.await(Task.gatherUnordered(Seq(left.close, right.close))){ _ => Halt(cause) }
}
val dequeue = Process((left.dequeue, right.dequeue))
enqueue merge dequeue
}