如果与[org.clojure/clojure "1.10.1"],[org.clojure/core.async "1.2.603"]
和最新的AmazonCorretto 11 JVM有关,我将使用它们。
以下代码是生产中使用的代码的简化版本,它确实会导致内存泄漏。我不知道为什么会发生这种情况,但怀疑可能是由于频道的故障。有人能帮我指出我的代码哪里可能出错,或者我如何修复内存泄漏吗?
(ns test-gc.core
(:require [clojure.core.async :as a :refer [chan put! close! <! go >! go-loop timeout]])
(:import [java.util UUID]))
(def global-msg-ch (chan (a/sliding-buffer 200)))
(def global-msg-pub (a/pub global-msg-ch :id))
(defn io-promise []
(let [id (UUID/randomUUID)
ch (chan)]
(a/sub global-msg-pub id ch)
[id (go
(let [x (<! ch)]
(a/unsub global-msg-pub id ch)
(:data x)))]))
(defn -main []
(go-loop []
(<! (timeout 1))
(let [[pid pch] (io-promise)
cmd {:id pid
:data (rand-int 1E5)}]
(>! global-msg-ch cmd)
(println (<! pch)))
(recur))
(while true
(Thread/yield)))
快速堆转储提供以下统计信息,例如:
按实例数分类
java.util.LinkedList
5157128(14.4%(java.util.concurrent.atomic.AtomicReference
3698382(10.3%(clojure.lang.Atom
3094279(8.6%(
按实例大小分类
java.lang.Object[]
210061752 B(13.8%(java.util.LinkedList
206285120 B(13.6%(clojure.lang.Atom
148525392 B(9.8%(clojure.core.async.impl.channels.ManyToManyChannel
132022336b(8.7%(
我终于明白了原因。通过查看源代码,我们得到以下片段:
(defn pub
"Creates and returns a pub(lication) of the supplied channel, ..."
...
(let [mults (atom {}) ;;topic->mult
ensure-mult (fn [topic]
(or (get @mults topic)
(get (swap! mults
#(if (% topic) % (assoc % topic (mult (chan (buf-fn topic))))))
topic)))
p (reify
Mux
(muxch* [_] ch)
Pub
(sub* [p topic ch close?]
(let [m (ensure-mult topic)]
(tap m ch close?)))
(unsub* [p topic ch]
(when-let [m (get @mults topic)]
(untap m ch)))
(unsub-all* [_] (reset! mults {}))
(unsub-all* [_ topic] (swap! mults dissoc topic)))]
...
p)))
我们可以看到mults
存储了所有的topic
,因此如果我们不清除它,它将单调增加。我们可以添加类似(a/unsub-all* global-msg-pub pid)
的东西来修复它。