我在保罗·布彻(Paul Butcher(的《7周内7个并发模型》的第6章,重点是core.async
。
我们有以下功能
(defn map-chan [f from]
(let [to (chan)]
(go-loop []
(when-let [x (<! from)]
(>! to (f x))
(println "parking channel write.")
(recur))
(close! to))
(println "map-chan done.")
to))
我自己添加了printlns
,以探索计算的确切顺序,我想在这里问一下。
我们可以这样运行它
(def ch (to-chan (range 10))) ; [1]
(def mapped (map-chan (partial * 2) ch)) ; [2]
(<!! (async/into [] mapped)) ; [3]
;; [1] Create & rtn a channel from els of seq, closing it when seq fin.
;; [2] map-chan returns immediately, with blocked go blocks inside of it.
;; [3] calling async/into finally triggers the parked channel writes, as seen below.
在 REPL 中:
channels.core=> (def ch (to-chan (range 10)))
#'channels.core/ch
channels.core=> (def mapped (map-chan (partial * 2) ch))
map-chan done.
#'channels.core/mapped
channels.core=> (<!! (async/into [] mapped))
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
[0 2 4 6 8 10 12 14 16 18]
channels.core=>
问题
我们这里有一个(同步((即无缓冲(通道,它有写入器和读取器都准备好了。为什么我的"停车通道写入"直到调用async/into
才触发? (触发它的不是用<!!
读取的频道,而是async/into
本身 - 易于检查(。我不是在抱怨这一点,只是想了解为什么痕迹是这样的。 频道实际上也有些懒惰吗?他还没有在书中提到这一点。
请注意,如果这有任何区别,则对此代码的依赖org.clojure/core.async "0.1.267.0-0d7780-alpha"
。
此外,在书中,他使用了长度为 10 的缓冲通道。 然而,我也尝试了使用无缓冲(同步(通道,结果似乎相同。
输出通道to
的大小为零,因此在请求相应的采样之前无法进行写入。 查看代码的修改版本:
(ns tst.demo.core
(:use tupelo.core tupelo.test )
(:require
[clojure.core.async :as async]
))
(defn map-chan [f from]
(let [to (async/chan)]
(async/go
(loop []
(when-let [x (async/<! from)]
(println "put - pre")
(async/>! to (f x))
(println "put - post")
(recur)))
(async/close! to))
(println "map-chan returns output buffer")
to))
(dotest
(println :1)
(spyx
(def ch (async/to-chan (range 10)))) ; [1]
(Thread/sleep 2000) (println :2)
(spyx
(def mapped (map-chan (partial * 2) ch))) ; [2]
(Thread/sleep 2000) (println :3)
(spyx
(async/<!! (async/into [] mapped))) ; [3]
)
结果:
-------------------------------
Clojure 1.10.1 Java 13
-------------------------------
lein test tst.demo.core
:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
:3
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]
因此,go 循环确实会立即开始运行,但第一个放置操作会阻塞,直到步骤 [3] 处发生async/into
。
如果我们使用长度为 20 的缓冲输出通道,我们会看到 go 循环在步骤 [3] 发生之前运行:
...
(let [to (async/chan 20)]
...
结果:
:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
:3
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]