当所有作业完成时,关闭生产者端的通道



对于我的Mandelbrot explorer项目,我需要运行几个昂贵的作业,最好是并行运行。我决定尝试将作业分块,并在自己的thread中运行每个块,最终得到了类似的东西

(defn point-calculator [chunk-size points]
(let [out-chan (chan (count points))
chunked (partition chunk-size points)]
(doseq [chunk chunked]
(thread
(let [processed-chunk (expensive-calculation chunk)]
(>!! out-chan processed-chunk))))
out-chan))

其中,points是要测试的[实坐标,虚坐标]的列表,expensive-calculation是获取块并测试块中的每个点的函数。每个区块可能需要很长时间才能完成(根据区块大小和作业数量,可能需要一分钟或更长时间)。

在我的消费者端,我使用

(loop []
(when-let [proc-chunk (<!! result-chan)]
; Do stuff with chunk
(recur)))

消耗每个已处理的块。现在,由于通道仍然打开,当最后一个块被消耗时,这个块就会被阻塞。

工作完成后,我需要一种关闭频道的方法。事实证明,这很困难,因为生产者循环具有异步性。我不能简单地在doseq之后放一个close!,因为循环不会阻塞,也不能在最后一个索引作业完成时关闭,因为顺序不确定。

我能想到的最好的想法是维护一个作业的(atom #{}),每个作业完成时维护disj。然后,我可以在循环中检查设置的大小,当它为0时检查close!,或者在原子上附加一个手表并在那里进行检查。

不过,这似乎很粗鲁。有没有更惯用的方法来处理这个问题?这种情况是否表明我使用async不正确?

我会看看core-async中的take函数。这就是它的文档所说的:

"返回一个通道,该通道最多从ch返回n个项目。在n个项目之后已返回,或者ch已关闭,返回通道将关闭。">

因此,它将引导您找到一个简单的解决方案:您可以将其封装到take:中,而不是返回out-chan

(clojure.core.async/take (count chunked) out-chan)

这应该行得通。此外,我建议您将您的示例从阻塞put/get重写为parking(<!>!),并将thread重写为go / go-loop,这是核心异步更常用的用法。

您可能希望使用异步/管道(-blocking)来控制并行性。并使用aysnc/onto-chan在复制完所有块后自动关闭输入通道。

例如,以下示例显示了当平行度设置为16时,经过时间的16x改进。

(defn expensive-calculation [pts]
(Thread/sleep 100)
(reduce + pts))
(time
(let [points     (take 10000 (repeatedly #(rand 100)))
chunk-size 500
inp-chan   (chan)
out-chan   (chan)]
(go-loop [] (when-let [res (<! out-chan)]
;; do stuff with chunk
(recur)))
(pipeline-blocking 16 out-chan (map expensive-calculation) inp-chan)
(<!! (onto-chan inp-chan (partition-all chunk-size points)))))

最新更新