如何使用Clojure Core Async间隔' n '秒调度函数列表



在我的Clojure项目中,我试图使http调用API的列表,该API具有速率限制器,仅允许每分钟n调用。我希望在所有http调用完成后返回每个响应以进行进一步处理。我是Clojure的核心Async的新手,但认为这将是一个很好的选择,但是因为我需要运行每个调用n秒分开我也试图使用Chime库。在Chime的库中,它有使用Core Async的示例,但示例都在每个时间间隔调用相同的函数,这将不适用于本用例。

虽然可能有一种方法可以使用chime-async更好地服务于这个用例,但我所有的尝试都失败了,所以我尝试简单地用核心异步包装Chime调用,但我可能对核心异步比Chime更困惑。

这是我的命名空间的一个例子。

(ns mp.util.schedule
(:require [chime.core :as chime]
[clojure.core.async :as a]
[tick.alpha.api :as tick]))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec & [{:keys [inst] :or {inst (tick/now)}}]]
(let [ch (a/chan (count fs))
chime-times (map-indexed
(fn mapped-fn [i f]
(a/put! ch (chime/chime-at [(.plusSeconds inst (* i sec))]
(fn wrapped-fn [_] (f)))))
fs)]
(doseq [chi chime-times]
(a/<!! chi))))
; === Test Code ===
; simple test function
(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m :at (tick/now))
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])

调用(schedule-fns fns 2)时,我想要发生的是fns中的每个函数相互运行n秒,schedule-fns返回(1 2 3)(函数的返回值),但这不是它正在做的。它在正确的时间调用每个函数(我可以从日志语句中看到),但它没有返回任何东西,并且有一个我不理解的错误。我:

(schedule-fns fns 2)
:one :at #time/instant "2021-03-05T23:31:52.565Z"
Execution error (IllegalArgumentException) at clojure.core.async.impl.protocols/eval11496$fn$G (protocols.clj:15).
No implementation of method: :take! of protocol: #'clojure.core.async.impl.protocols/ReadPort found for class: java.lang.Boolean
:two :at #time/instant "2021-03-05T23:31:54.568Z"
:three :at #time/instant "2021-03-05T23:31:56.569Z"

如果我能得到帮助让我的代码正确使用Core Async(有或没有Chime),我真的很感激。谢谢。

试试这个:

(defn sim-fn
"simple function that prints a message and value, then returns the value"
[v m]
(println m)
v)
; list of test functions
(def fns [#(sim-fn 1 :one)
#(sim-fn 2 :two)
#(sim-fn 3 :three)])
(defn schedule-fns [fns sec]
(let [program (interpose #(Thread/sleep (* sec 1000))
fns)]
(remove #(= % nil)
(for [p program]
(p)))))

然后调用:

> (schedule-fns fns 2)
:one
:two
:three
=> (1 2 3)

我想到了一个方法来得到我想要的…

(def results (atom []))
(defn schedule-fns
"Takes a list of functions and a duration in seconds then runs each function in the list `sec` seconds apart
optionally provide an inst to start from"
[fs sec]
(let [ch (chan (count fs))]
(go-loop []
(swap! results conj (<! ch))
(recur))
(map-indexed (fn [i f]
(println :waiting (* i sec) :seconds)
(go (<! (timeout (* i sec 1000)))
(>! ch (f))))
fs)))

这段代码具有我想要的计时和行为,但是我必须使用atom来存储响应。虽然我可以添加一个监视程序来确定所有结果何时出现,但我仍然觉得不应该这样做。

我想我现在会使用这种方法,但在某种程度上,我会继续研究它,如果有人有比这种方法更好的方法,我很乐意看到它。

我有几个朋友看了这个,他们每个人都有不同的答案。这些肯定比我刚才做的好。

(defn schedule-fns [fs secs]
(let [ret (atom {})
sink (a/chan)]
(doseq [[n f] (map-indexed vector fs)]
(a/thread (a/<!! (a/timeout (* 1000 n secs)))
(let [val (f)
this-ret (swap! ret assoc n val)]
(when (= (count fs) (count this-ret))
(a/>!! sink (mapv (fn [i] (get this-ret i)) (range (count fs))))))))
(a/<!! sink)))

(defn schedule-fns
[fns sec]
(let [concurrent (count fns)
output-chan (a/chan)
timedout-coll (map-indexed (fn [i f]
#(do (println "Waiting")
(a/<!! (a/timeout (* 1000 i sec)))
(f))) fns)]
(a/pipeline-blocking concurrent
output-chan
(map (fn [f] (f)))
(a/to-chan timedout-coll))
(a/<!! (a/into [] output-chan))))

如果您的目标是绕过速率限制器,您可以考虑在异步通道中实现它。下面是一个示例实现—该函数接受一个通道,使用基于令牌的限制器对其输入进行限制,并将其输送到输出通道。

(require '[clojure.core.async :as async])
(defn rate-limiting-ch [input xf rate]
(let [tokens (numerator rate)
period (denominator rate)
ans    (async/chan tokens xf)
next   (fn [] (+ period (System/currentTimeMillis)))]
(async/go-loop [c tokens
t (next)]
(if (zero? c)
(do
(async/<! (async/timeout (- t (System/currentTimeMillis))))
(recur tokens (next)))
(when-let [x (async/<! input)]
(async/>! ans x)
(recur (dec c) t))))
ans))

下面是一个示例用法:

(let [start  (System/currentTimeMillis)
input  (async/to-chan (range 10))
output (rate-limiting-ch input
;; simulate an api call with roundtrip time of ~300ms
(map #(let [wait (rand-int 300)
ans  {:time  (- (System/currentTimeMillis) start)
:wait  wait
:input %}]
(Thread/sleep wait)
ans))
;; rate limited to 2 calls per 1000ms
2/1000)]
;; consume the output
(async/go-loop []
(when-let [x (async/<! output)]
(println x)
(recur))))

输出:

{:time 4, :wait 63, :input 0}
{:time 68, :wait 160, :input 1}
{:time 1003, :wait 74, :input 2}
{:time 1079, :wait 151, :input 3}
{:time 2003, :wait 165, :input 4}
{:time 2169, :wait 182, :input 5}
{:time 3003, :wait 5, :input 6}
{:time 3009, :wait 18, :input 7}
{:time 4007, :wait 138, :input 8}
{:time 4149, :wait 229, :input 9}

最新更新