我想在Clojure:中创建一个batch
函数
给定一个delay-ms
,它将在该时间段内批处理对函数f
的调用,并一次性发送所有调用。
这里有一个天真的实现:
(defn delay-f
[f delay-ms]
(let [timer (Timer.)
task (atom nil)
latest-batch (atom [])]
(fn [& args]
(swap! latest-batch conj args)
(when-not @task
(let [new-task (proxy [TimerTask] []
(run []
(f @latest-batch)
(reset! task nil)
(reset! latest-batch [])
(.purge timer)))]
(reset! task new-task)
(.schedule timer new-task delay-ms))))))
我很确定,考虑到我在这里使用原子,存在一个种族条件。
这里的惯用解决方案是什么?
我认为解决这个问题的最好方法是使用定时器库overtone/at,而不是重新发明轮子。特别是overnone.at-at/every函数提供了您想要的日程安排。
定义一个原子来保存要执行的累积任务(也称为"thunks"),以及一个函数来将新任务附加到队列中。
定义一个";执行";传递给every
的函数,该函数将清除队列并按顺序执行在那里找到的每个任务。
Clojure原子将防止任何竞争条件;附加";函数或";执行";函数可以在任何给定的时间点执行。如果两个函数都试图同时修改原子内容,其中一个函数将被迫等待,直到另一个完成。
有关其他库,请参阅主题调度下的Clojure工具箱。
TimerTask仍在执行时,只要调用函数,代码就会可靠地中断。首先,您希望在运行函数之前而不是之后重置任务和最新的批处理原子。这仍然会有比赛条件,尽管可能性较小。我们可以使用ConcurrentLinkedQueue:
(defn delay-f
[f delay-ms]
(let [timer (java.util.Timer.)
task (atom nil)
queue (java.util.concurrent.ConcurrentLinkedQueue.)]
(fn [& args]
(.offer queue args)
(when-not @task
(let [new-task (proxy [java.util.TimerTask] []
(run []
(reset! task nil)
(f (loop [r []]
(if-let [e (.poll queue)]
(recur (conj r e))
r)))
(.purge timer)))]
(reset! task new-task)
(.schedule timer new-task delay-ms))))))