如何组合多个 Scalaz-Stream,以便保留完成顺序但不强制执行交错?


var num =0
var num2 = 3333
val p2 = Process.eval {
  Thread.sleep(10000)
  Task.delay {
    Thread.sleep(10000)
    num2 = num2 + 1
    s"hi ${num2}"
  }
}.repeat.take(15)
//p2: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] =
// Await(scalaz.concurrent.Task@5a554f1c,
//<function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$))
val p1 = Process.eval {
  Thread.sleep(2000)
  Task.delay { 
    Thread.sleep(2000)
    num = num + 1
    s"hi $num"
  }
}.repeat.take(15)
//p1: scalaz.stream.Process[[x]scalaz.concurrent.Task[x],String] = 
// Await(scalaz.concurrent.Task@7a54e904,
// <function1>,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$))
// this interleaves them and I get disjunctions showing me their order
(p1 either p2).map(println).run.run
// this gives me the strings that are interleaved
(p1 interleave p2).map(println).run.run

你如何得到一个进程,它是 2 个进程的组合,但无论它们以什么顺序到达(这意味着如果左边在右边之前走两次,没关系,给左边两次,然后在右边到达后发出它)?

我正在寻找睡眠时间较短的那个更频繁地发生,并在较慢的过程之前看到它多次出现。 提前感谢任何花时间阅读本文的人,尤其是那些可以分享一些见解的人。

Eric,

非确定错是通过 Process.wye 在 scalaz-stream 中实现的,事实上,两者都是使用 Wye 的非确定性组合器之一。你看到它们左/右交错的原因是因为它试图公平,因为你阻止了线程。尝试创建比第二侧慢的一侧,您将看到其中一侧是不确定的。

请注意,为了实现非确定性行为,您实际上需要从两个线程运行的进程,您的 p1 进程实际上阻塞了单个线程,因此在您的场景中,顺序始终是确定性的

尝试:

val p1 = Process(1,2,3).toSource
val p2 = Process(10) fby Process.sleep(1 second) fby Process(20,30).toSource
(p1 either p2).runLog.run.foreach(println)

应该发出

-/(1)
/-(10)
-/(2)
-/(3)
/-(20)
/-(30)

最新更新