Clojure - Core.async Pipeline + 消除混乱



我很难理解我认为在Clojure的异步库中一个非常简单的概念。我实际上是使用管道创建两个通道,其中输出通道是使用输入通道的 take 函数创建的。

根据我的理解,take的目的是限制通道在关闭自身之前将接收的项目数量(如果此时输入通道尚未关闭(。但是,我一直在使用的代码示例并没有产生我预期的结果。

以以下代码为例:

(def in (chan 1))
(def out (async/take 5 in 1))
(doseq [i (range 10)]
(go (>! in i)))
(pipeline 4 out (filter even?) in)
(go-loop []
(when-some [val (<! out)]
(println val)
(recur))))

我期望发生的事情是管道会过滤掉奇数,并且只将偶数传递给"out"通道,当输出通道收到 5 个偶数时,它将关闭。然而,我看到的是打印到 REPL 的奇数和偶数,如下所示:

阿拉伯数字 7 4 0 8 6

此时,输出通道仍未关闭,第二次运行 doseq 将在最终关闭之前打印一些其他值。

我对这里发生的事情感到非常困惑,当使用 take 而不是管道时,它就像一个魅力,当不使用 take 但仍使用管道时,它也可以工作,将两者结合使用似乎是一个完全不同的故事。我在这里错过了一些明显的东西吗?抱歉,如果这是一个简单的错误,这是我第一次(尽管很天真(尝试使用 core.async。

你已经把takepipeline放在了竞争中。 他们俩都从in中获取物品并将它们添加到out. 替换out的定义:

(def out (async/chan 3))

例如,并获得预期的结果

0
2
4
6
8

如果你真的想使用async/take,你可以这样做:

(def first (async/chan 1))
(def second (async/chan 3))
(pipeline 4 second (filter even?) first)
(def third (async/take 3 second))
(defn run []
(go
(doseq [i (range 10)]
(>! first i)))
(go (loop []
(when-some [val (<! third)]
(println val)
(recur)))))

结果:

0
2
4

最新更新