我想弄清楚如何最好地创建异步组件,或者以组件友好的方式容纳异步代码。这是我能想出的最好的办法了,而且……就是感觉不太对。
要点:取单词,uppercase
和reverse
,最后是print
。
问题1:我不能让system
在最后停止。我希望看到单个c-chan
的println
停止,但不要。
问题2:如何正确注入深度。进入producer
/consumer
fns?我的意思是,它们不是组件,而且我认为它们不应该是组件,因为它们没有合理的生命周期。
问题3:如何习惯地处理命名为a>b
和b>c
的async/pipeline
创建副作用?pipeline
应该是一个组件吗?
(ns pipelines.core
(:require [clojure.core.async :as async
:refer [go >! <! chan pipeline-blocking close!]]
[com.stuartsierra.component :as component]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
(pipeline-blocking 4
b>
(map clojure.string/upper-case)
a>))
(defn b>c [b> c>]
(pipeline-blocking 4
c>
(map (comp (partial apply str)
reverse))
b>))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
(doseq [word ["apple" "banana" "carrot"]]
(go (>! a> word))))
(defn consumer [c>]
(go (while true
(println "Your Word Is: " (<! c>)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
(let [c-chan (reify component/Lifecycle
(start [this]
(println "starting chan: " this)
(chan 1))
(stop [this]
(println "stopping chan: " this)
(close! this)))]
(-> (component/system-map
:a> c-chan
:b> c-chan
:c> c-chan)
(component/using {}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_ (reset! system (component/start (pipeline-system {})))
_ (a>b (:a> @system) (:b> @system))
_ (b>c (:b> @system) (:c> @system))
_ (producer (:a> @system))
_ (consumer (:c> @system))
_ (component/stop @system)])
编辑:
我开始考虑以下内容,但我不太确定它是否正确关闭…
(extend-protocol component/Lifecycle
clojure.core.async.impl.channels.ManyToManyChannel
(start [this]
this)
(stop [this]
(close! this)))
我稍微重写了一下你的例子,使它可以重新加载:
<标题> 可写管道(ns pipeline
(:require [clojure.core.async :as ca :refer [>! <!]]
[clojure.string :as s]))
(defn upverse [from to]
(ca/pipeline-blocking 4
to
(map (comp s/upper-case
s/reverse))
from))
(defn produce [ch xs]
(doseq [word xs]
(ca/go (>! ch word))))
(defn consume [ch]
(ca/go-loop []
(when-let [word (<! ch)]
(println "your word is:" word)
(recur))))
(defn start-engine []
(let [[from to] [(ca/chan) (ca/chan)]]
(upverse to from)
(consume from)
{:stop (fn []
(ca/close! to)
(ca/close! from)
(println "engine is stopped"))
:process (partial produce to)}))
这样你就可以使用(start-engine)
并使用它来处理字序列:
boot.user=> (require '[pipeline])
boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine
运行
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
your word is: TORRAC
your word is: ANANAB
your word is: ELPPA
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
阻止它
boot.user=> ((:stop engine))
engine is stopped
;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil
<标题>状态管理根据你打算如何使用这个管道,可能根本不需要像Component这样的状态管理框架:不需要添加任何"以防万一"的东西,在这种情况下,启动和停止管道只是调用两个函数的问题。
然而,如果这个管道在一个更大的应用程序中使用,有更多的状态,你肯定可以从状态管理库中受益。
我不是Component的粉丝,主要是因为它需要一个完整的应用程序购买(这使它成为一个框架),但我尊重其他人使用它。 山我建议在应用程序很小的情况下不要使用任何特定的东西:例如,您可以将这个管道与其他管道/逻辑组合在一起,并从-main
中启动它,但如果应用程序更大并且有更多不相关的状态,您需要做的就是添加mount到它:
(defstate engine :start (start-engine)
:stop ((:stop engine)))
开始管道boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}
运行
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
阻止它
boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}
这里是一个包含build.boot
的完整示例的要点。
你可以通过boot repl
下载并使用它
[EDIT]:回复评论
如果你已经迷上了组件,下面的内容应该可以让你开始:
(defrecord WordEngine []
component/Lifecycle
(start [component]
(merge component (start-engine)))
(stop [component]
((:stop component))
(assoc component :process nil :stop nil)))
这将在开始时创建一个WordEngine
对象,该对象将具有:process
方法。
你将无法调用它作为一个普通的Clojure函数:即从REPL或任何命名空间只是通过:require
,除非你传递一个引用到整个系统,这是不推荐的。
所以为了调用它,这个WordEngine
需要插入到一个组件系统中,并注入到另一个组件中,然后该组件可以解构:process
函数并调用它。