我如何改进这个Clojure组件+异步的例子



我想弄清楚如何最好地创建异步组件,或者以组件友好的方式容纳异步代码。这是我能想出的最好的办法了,而且……就是感觉不太对。

要点:取单词,uppercasereverse,最后是print

问题1:我不能让system在最后停止。我希望看到单个c-chanprintln停止,但不要。

问题2:如何正确注入深度。进入producer/consumer fns?我的意思是,它们不是组件,而且我认为它们不应该是组件,因为它们没有合理的生命周期。

问题3:如何习惯地处理命名为a>bb>casync/pipeline创建副作用?pipeline应该是一个组件吗?

(ns pipelines.core
  (:require [clojure.core.async :as async
             :refer [go >! <! chan pipeline-blocking close!]]
            [com.stuartsierra.component :as component]))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
  (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(defn b>c [b> c>]
  (pipeline-blocking 4
                     c>
                     (map (comp (partial apply str)
                                reverse))
                     b>))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
  (doseq [word ["apple" "banana" "carrot"]]
    (go (>! a> word))))
(defn consumer [c>]
  (go (while true
        (println "Your Word Is: " (<! c>)))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
  (let [c-chan (reify component/Lifecycle
                 (start [this]
                   (println "starting chan: " this)
                   (chan 1))
                 (stop [this]
                   (println "stopping chan: " this)
                   (close! this)))]
    (-> (component/system-map
         :a> c-chan
         :b> c-chan
         :c> c-chan)
        (component/using {}))))

;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_      (reset! system (component/start (pipeline-system {})))
      _      (a>b (:a> @system) (:b> @system))
      _      (b>c (:b> @system) (:c> @system))
      _      (producer (:a> @system))
      _      (consumer (:c> @system))
      _      (component/stop @system)])

编辑:

我开始考虑以下内容,但我不太确定它是否正确关闭…

(extend-protocol component/Lifecycle
  clojure.core.async.impl.channels.ManyToManyChannel
  (start [this]
    this)
  (stop [this]
    (close! this)))

我稍微重写了一下你的例子,使它可以重新加载:

<标题> 可写管道
(ns pipeline
  (:require [clojure.core.async :as ca :refer [>! <!]]
            [clojure.string :as s]))
(defn upverse [from to]
  (ca/pipeline-blocking 4
                        to
                        (map (comp s/upper-case
                                   s/reverse))
                        from))
(defn produce [ch xs]
  (doseq [word xs]
    (ca/go (>! ch word))))
(defn consume [ch]
  (ca/go-loop []
              (when-let [word (<! ch)]
                (println "your word is:" word)
                (recur))))
(defn start-engine []
  (let [[from to] [(ca/chan) (ca/chan)]]
    (upverse to from)
    (consume from)
    {:stop (fn []
             (ca/close! to)
             (ca/close! from)
             (println "engine is stopped"))
     :process (partial produce to)}))

这样你就可以使用(start-engine)并使用它来处理字序列:

<标题> REPL时间
boot.user=> (require '[pipeline])
boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine

运行

boot.user=> ((engine :process) ["apple" "banana" "carrot"])
your word is: TORRAC
your word is: ANANAB
your word is: ELPPA
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

阻止它

boot.user=> ((:stop engine))
engine is stopped
;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil
<标题>状态管理

根据你打算如何使用这个管道,可能根本不需要像Component这样的状态管理框架:不需要添加任何"以防万一"的东西,在这种情况下,启动和停止管道只是调用两个函数的问题。

然而,如果这个管道在一个更大的应用程序中使用,有更多的状态,你肯定可以从状态管理库中受益。

我不是Component的粉丝,主要是因为它需要一个完整的应用程序购买(这使它成为一个框架),但我尊重其他人使用它。

我建议在应用程序很小的情况下不要使用任何特定的东西:例如,您可以将这个管道与其他管道/逻辑组合在一起,并从-main中启动它,但如果应用程序更大并且有更多不相关的状态,您需要做的就是添加mount到它:

(defstate engine :start (start-engine)
                 :stop ((:stop engine)))

开始管道
boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}

运行

boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

阻止它

boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}

这里是一个包含build.boot的完整示例的要点。

你可以通过boot repl下载并使用它


[EDIT]:回复评论

如果你已经迷上了组件,下面的内容应该可以让你开始:

(defrecord WordEngine []
  component/Lifecycle
  (start [component]
    (merge component (start-engine)))
  (stop [component]
    ((:stop component))
    (assoc component :process nil :stop nil)))

这将在开始时创建一个WordEngine对象,该对象将具有:process 方法

你将无法调用它作为一个普通的Clojure函数:即从REPL或任何命名空间只是通过:require,除非你传递一个引用到整个系统,这是不推荐的。

所以为了调用它,这个WordEngine需要插入到一个组件系统中,并注入到另一个组件中,然后该组件可以解构:process函数并调用它。

最新更新