为了尝试理解core.async,我尝试实现"天网100万微基准",这是:
创建一个角色(goroutine,随便什么(,生成10个新Actor, 他们每个人都会再生成 10 个演员,依此类推,直到 100 万演员 在最后一关创建。然后,他们每个人都返回其 序数(从 0 到 999999(,在前一个上一个上求和 水平并发送回上游,直到到达根参与者。(的 答案应该是499999500000(。
这里有多种语言的实现:
https://github.com/atemerev/skynet
这是我完全失败的尝试:
(defn skynet [chan num size div]
(if (= 1 size)
(>! chan num)
(>! chan (reduce + (let [rc (async/chan)
n (/ size div)]
(doall (for [i [0 div]]
(skynet rc (+ num (* i n)) n div))
(for [i [0 div]] (<! rc))))))))
我试图从 REPL 的一个街区内调用这一切:
(time (go (<!! (skynet (async/chan) 0 1000000 10))))
我可能对有关core.async的许多事情感到非常困惑(以及惰性评估(。
我应该如何解决这个问题,为什么?
core.async 能够执行的操作存在一些限制,因此您无法使用 map
或 for
函数。
您的实现非常接近正确的实现。几点:
-
go
== 一个进程,所以你只创建一个进程,而不是 1m -
<!!
将在围棋块外使用 -
<!
将在围棋块中使用 - 您不正确地使用
-
doall
只接受一个参数
一个可能可以改进的工作实现:
(defn skynet [parent num size div]
(go ;; We create a new process each time skynet is called
(if (= 1 size)
(>! parent num)
(let [self (chan)
new-size (/ size div)]
(dotimes [i div] ;; dotimes is more explicit for side effects
(skynet self (+ num (* i new-size)) new-size div))
(loop [i div ;; Manual reduce
t 0]
(if (zero? i)
(>! parent t)
(recur (dec i)
(+ t (<! self)))))))))
并称之为:
(time
(do
(def result (chan))
(def x (skynet result 0 1000000 10))
(<!! result)))