>我有一个简单的程序:
import scalaz._
import stream._
object Play extends App {
val in1 = io.linesR("C:/tmp/as.txt")
val in2 = io.linesR("C:/tmp/bs.txt")
val p = (in1 merge in2) to io.stdOutLines
p.run.run
}
文件as.txt
包含 5 个a
,文件bs.txt
包含 3 个b
s。我看到这种输出:
a
b
b
a
a
b
a
a
a
但是,当我更改in2
声明时,如下所示:
val in2 = io.stdInLines
然后我得到了我认为是意想不到的行为。根据文档 1,程序应该根据哪个流更快地提供东西,不确定地从每个流中提取数据。这应该意味着我看到一堆a
立即打印到控制台上,但这根本不是发生的事情。
事实上,直到我按下ENTER
,什么也没发生。很明显,如果我随机选择一个流来获取下一个元素,然后,如果该流阻塞,合并的进程也会阻塞(即使另一个流包含数据),则行为看起来很像我所期望的。
这是怎么回事?
1 - 好吧,好吧,文档很少,但 Dan Spiewak 在他的演讲中非常清楚地表示,它会抓住第一个提供数据的人
问题出在stdInLines
的实现上。它是阻塞的,它永远不会Task.fork
线程。
尝试将stdInLines
的嵌入更改为此:
def stdInLines: Process[Task,String] =
Process.repeatEval(Task.apply {
Option(scala.Console.readLine())
.getOrElse(throw Cause.Terminated(Cause.End))
})
原始io.stdInLines
在同一线程中运行readLine()
,因此它始终在那里等待,直到您键入内容。