当MAPCAT在Core.Ashc中打破背压时,内存泄漏在哪里



我在clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存,并且失败了错误。看来在核心中使用 mapcat会破坏背压。(不幸的是,这是不超出此问题范围的原因。)

这是一些代码,可以通过计数:x s进出mapcat ING传感器来证明问题:

(ns mapcat.core
  (:require [clojure.core.async :as async]))
(defn test-backpressure [n length]
  (let [message (repeat length :x)
        input (async/chan)
        transform (async/chan 1 (mapcat seq))
        output (async/chan)
        sent (atom 0)]
    (async/pipe input transform)
    (async/pipe transform output)
    (async/go
      (dotimes [_ n]
        (async/>! input message)
        (swap! sent inc))
      (async/close! input))
    (async/go-loop [x 0]
      (when (= 0 (mod x (/ (* n length) 10)))
        (println "in:" (* @sent length) "out:" x))
      (when-let [_ (async/<! output)]
        (recur (inc x))))))
=> (test-backpressure 1000 10)
in: 10 out: 0
in: 2680 out: 1000
in: 7410 out: 2000
in: 10000 out: 3000 ; Where are the other 7000 characters?
in: 10000 out: 4000
in: 10000 out: 5000
in: 10000 out: 6000
in: 10000 out: 7000
in: 10000 out: 8000
in: 10000 out: 9000
in: 10000 out: 10000

生产者比赛远远领先于消费者。

看来我不是第一个发现这一点的人。但是这里给出的解释似乎并没有涵盖它。(尽管它确实提供了足够的解决方法。)从概念上讲,我希望生产者能够领先,但只有在频道中可能被缓冲的少数消息的长度。

我的问题是,所有其他消息在哪里?由第四行的输出7000 :x s未计算。

更新2020-01-14:现在已修复内存泄漏。

有两种可能的解释"内存泄漏在哪里?"

首先,数据持有在哪里?答案似乎位于频道缓冲区,立即在扩展变换的下游。

默认情况下通道使用 FixedBuffer(clojure.core.async.impl.buffers/fieldbuffer),它可以判断它是否已满,但不反对被覆盖。

其次,哪种代码导致缓冲区被过熟?这个(如果我错了,请纠正我)似乎是在take!的CC_6方法中(CLOJURE.CORE.ASYNC.IMPL.CHANNELS/MANETOMANYCHANNEL),在此,在对full?进行任何调用之前,在缓冲区发生了第一个呼叫add!

看来take!假设它可以为其删除的每个项目添加至少一个项目。如果长期运行扩展传感器(例如mapcat),这并不总是一个安全的假设。

通过将此行更改为coc_12,以core.Async的本地副本中的 (when (and (.hasNext iter) (not (impl/full? buf)))更改,我可以在问题中按预期进行代码。(N.B.我对Core.Async的理解不足以保证这是 your 用例的强大解决方案。)

更新2016-09-17:现在有一个问题:http://dev.clojure.org/jira/jira/browse/ashasync-178

更新2020-01-14:现在已修复到:https://clojure.atlassian.net/browse/ashasync-210(尽管较早的票证为"拒绝")

最新更新