FS2到FS2在第一个流结束时关闭所有资源



假设我们有2个fs2流:

val stream1 = fs2.Stream.bracket(IO { println("Acquire 1"); 2})(_ => IO { println("Release 1") })
.flatMap(p => fs2.Stream.range(1,p))
val stream2 = fs2.Stream.bracket(IO { println("Acquire 2"); 4})(_ => IO { println("Release 2") })
.flatMap(p => fs2.Stream.range(1,p))

我想相互连接:

def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.uncons1.flatMap { stream1Element =>
stream2.pull.uncons1.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some((stream1Head, stream1Tail)), Some((stream2Head, stream2Tail))) =>
println("Some, Some")
Pull.output1(stream1Head + stream2Head) >> go(stream1Tail, stream2Tail)
case (Some((stream1Head, stream1Tail)), None) =>
println("1 Stream still available")
Pull.output1(stream1Head) >> go(fs2.Stream.empty, stream1Tail)
case (None, Some((stream2Head, stream2Tail))) =>
println("2 Stream still available")
Pull.output1(stream2Head) >> go(fs2.Stream.empty, stream2Tail)
case _ => Pull.output1(-1)
}
}
}
(one, two) => go(one, two).stream

}

现在检查日志我看到:

Acquire 1
Acquire 2
Some, Some
Release 2
Release 1
2 Stream still available
2 Stream still available

这对我来说有点奇怪,因为似乎一旦第一个流完成,第二个流的资源也会关闭。假设现在资源是到数据库的连接,那么第二个流中的元素就不能再提取了。

这是正确的行为吗?有什么方法可以避免关闭第二个流的资源吗?令人惊讶的是,如果第一个流比第二个流有更多的元素,那么一切都如预期的那样工作(当第二流完成时,流1的资源没有关闭(

通过检查zipAllWith函数的实现,我发现在这种情况下确实应该避免unsu1。最终的解决方案是使用stepLeg函数,而不是unsu1。所以上面的函数应该是这样的:

def connect[F[_]]: (fs2.Stream[F, Int], fs2.Stream[F, Int]) => fs2.Stream[F, Int] = {
def go(stream1: fs2.Stream[F, Int], stream2: fs2.Stream[F, Int]): Pull[F, Int, Unit] =
stream1.pull.stepLeg.flatMap { stream1Element =>
stream2.pull.stepLeg.flatMap { stream2Element =>
(stream1Element, stream2Element) match {
case (Some(sl1), Some(sl2)) =>
println("Some, Some")
val one = sl1.head(0)
val two = sl2.head(0)
Pull.output1(one + two) >> go(sl1.stream, sl2.stream)
case (Some(sl1), None) =>
val one = sl1.head(0)
println("1 Stream still available")
Pull.output1(one) >> go(sl1.stream, fs2.Stream.empty)
case (None, Some(sl2)) =>
val two = sl2.head(0)
println("2 Stream still available")
Pull.output1(two) >> go(fs2.Stream.empty, sl2.stream)
case _ => Pull.output1(-1)
}
}
}
(one, two) => {
go(one.flatMap(fs2.Stream.emit), two.flatMap(fs2.Stream.emit)).stream
}
}

日志:

Acquire 1
Acquire 2
Some, Some
Release 1
2 Stream still available
2 Stream still available
Release 2

这个问题的另一个例子可以在这里找到:unc与stepLeg

相关内容

  • 没有找到相关文章

最新更新