合并 scalaz 流输入进程似乎在 stdin 上"wait"



>我有一个简单的程序:

import scalaz._
import stream._
object Play extends App {
  val in1 = io.linesR("C:/tmp/as.txt")
  val in2 = io.linesR("C:/tmp/bs.txt")
  val p = (in1 merge in2) to io.stdOutLines
  p.run.run
}

文件as.txt包含 5 个a,文件bs.txt包含 3 个b s。我看到这种输出:

a
b
b
a
a
b
a
a
a

但是,当我更改in2声明时,如下所示:

val in2 = io.stdInLines

然后我得到了我认为是意想不到的行为。根据文档 1,程序应该根据哪个流更快地提供东西,不确定地从每个流中提取数据。这应该意味着我看到一堆a立即打印到控制台上,但这根本不是发生的事情。

事实上,直到我按下ENTER,什么也没发生。很明显,如果我随机选择一个流来获取下一个元素,然后,如果该流阻塞,合并的进程也会阻塞(即使另一个流包含数据),则行为看起来很像我所期望的。

这是怎么回事?

1 - 好吧,好吧,文档很少,但 Dan Spiewak 在他的演讲中非常清楚地表示,它会抓住第一个提供数据的人

问题出在stdInLines的实现上。它是阻塞的,它永远不会Task.fork线程。

尝试将stdInLines的嵌入更改为此:

def stdInLines: Process[Task,String] =
    Process.repeatEval(Task.apply { 
    Option(scala.Console.readLine())
    .getOrElse(throw Cause.Terminated(Cause.End))
})

原始io.stdInLines在同一线程中运行readLine(),因此它始终在那里等待,直到您键入内容。

最新更新