阻止生产者/消费者循环的惯用方法?



为了获得一些良好的并发编程实践,我正在尝试使用Clojure的core.async库实现生产者/消费者模式。一切进展顺利,但我希望能够在某个时间点阻止生产者和消费者。

我当前的代码看起来像这样...

(def c (a/chan 5))
(def alive (atom true))
(def producer (a/go-loop []
(Thread/sleep 1000)
(when @alive 
(a/>! c 1)
(recur))))
(def consumer (a/go-loop []
(Thread/sleep 3000)
(when @alive 
(println (a/<! c))
(recur))))
(do
(reset! alive false)
(a/<!! producer)
(a/<!! consumer))

不幸的是,"do"块似乎偶尔会无限期地阻止。我本质上希望能够阻止两个 go-loop 继续并阻止直到两个循环都退出。线程/睡眠代码用于模拟执行某些工作单元。

我怀疑停止生产者会导致消费者停车,因此挂起,尽管我不确定替代方法,但有什么想法吗?

有关详细信息,请参阅 ClojureDocs。 例:

(let [c (chan 2) ]
(>!! c 1)
(>!! c 2)
(close! c)
(println (<!! c)) ; 1
(println (<!! c)) ; 2
;; since we closed the channel this will return false(we can no longer add values)
(>!! c 1))

对于您的问题,例如:

(let [c        (a/chan 5)
producer (a/go-loop [cnt 0]
(Thread/sleep 1000)
(let [put-result (a/>! c cnt)]
(println "put: " cnt put-result)
(when put-result
(recur (inc cnt)))))
consumer (a/go-loop []
(Thread/sleep 3000)
(let [result (a/<! c)]
(when result
(println "take: " result)
(recur))))]
(Thread/sleep 5000)
(println "closing chan...")
(a/close! c))

有结果

put:  0 true
put:  1 true
take:  0
put:  2 true
put:  3 true
closing chan...
put:  4 false
take:  1
take:  2
take:  3

最新更新