并发流不会将任何内容打印到控制台



我正在尝试构建一个关于在fs2中使用Stream.concurrently方法的示例。我正在开发生产者/消费者模式,使用Queue作为共享状态:

import cats.effect.std.{Queue, Random}
object Fs2Tutorial extends IOApp {
val random: IO[Random[IO]] = Random.scalaUtilRandom[IO]
val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)
val producer: IO[Nothing] = for {
r <- random
q <- queue
p <-
r.betweenInt(1, 11)
.flatMap(q.offer)
.flatTap(_ => IO.sleep(1.second))
.foreverM
} yield p
val consumer: IO[Nothing] = for {
q <- queue
c <- q.take.flatMap { n =>
IO.println(s"Consumed $n")
}.foreverM
} yield c
val concurrently: Stream[IO, Nothing] = Stream.eval(producer).concurrently(Stream.eval(consumer))
override def run(args: List[String]): IO[ExitCode] = {
concurrently.compile.drain.as(ExitCode.Success)
}
}

我希望程序打印一些"Consumed n",对于一些n。但是,该程序不会向控制台打印任何内容。

上面的代码出了什么问题?

上面的代码有什么问题?

您在消费者和生产者中没有使用相同的Queue,而是他们每个人都在创建自己新的独立Queue(RandomBTW也是如此(

这是新手犯的一个常见错误,他们还没有掌握像IO这样的数据类型背后的主要原理

当你做val queue: IO[Queue[IO, Int]] = Queue.bounded[IO, Int](10)时,你是说queue是一个程序,当它被求值时,会产生一个类型为Queue[IO, Unit]的值,这就是所有这些的要点
程序变成一个值,作为任何值,您都可以以任何方式操纵它来产生新的值,例如使用flatMap,因此当consumer&CCD_ 14通过CCD_ 15和CCD_ 16创建新的独立程序/值。

你可以这样修复代码:

import cats.effect.{IO, IOApp}
import cats.effect.std.{Queue, Random}
import cats.syntax.all._
import fs2.Stream
import scala.concurrent.duration._
object Fs2Tutorial extends IOApp.Simple {  
override final val run: IO[Unit] = {
val resources =
(
Random.scalaUtilRandom[IO],
Queue.bounded[IO, Int](10)
).tupled

val concurrently =
Stream.eval(resources).flatMap {
case (random, queue) =>
val producer = 
Stream
.fixedDelay[IO](1.second)
.evalMap(_ => random.betweenInt(1, 11))
.evalMap(queue.offer)
val consumer =
Stream.fromQueueUnterminated(queue).evalMap(n => IO.println(s"Consumed $n"))

producer.concurrently(consumer)
}

concurrently.interruptAfter(10.seconds).compile.drain >> IO.println("Finished!")
}
}

(您可以在此处看到它在运行(


PS:我建议您查看";作为值的程序">Fabio Labella系列:https://systemfw.org/archive.html