如何实现平行逻辑或在clojure中的尽早终止



我想定义一个谓词,作为输入某些谓词使用相应的输入(可以作为懒惰的呼叫序列给出(,并行运行它们,并计算逻辑或结果,以这样的方式,当谓词呼叫终止返回true时,整个计算也终止(返回true(。

除了提供时间优化之外,这也将有助于避免在某些情况下,非终止(某些谓词调用可能不会终止(。实际上,将非终端解释为第三个undefined值,这个谓词模拟了克莱恩的K3逻辑中的或操作(在初始中心的kleene代数中加入(。

这里为Haskell家族提供了类似的东西。有什么(最好是简单(在Clojure中执行此操作的方法?

编辑:我决定在阅读评论后添加一些澄清。

(a(首先,线程池耗尽后发生的事情不那么重要。我认为创建一个足够大的线程池是一个合理的惯例。

(b(最关键的要求是谓词调用并行运行,一旦谓词调用终止返回true,所有其他的线程都会中断。预期的行为是:

  • 如果有一个谓词调用返回true:并行或返回true
  • 否则如果有一个未终止的谓词调用:平行或不终止
  • 其他:并行或返回false

换句话说,它的行为就像false&lt给出的3元素晶格中的连接一样。undefined<true,用undefined代表非终止。

(c(并行或应能够作为输入许多谓词和许多谓词输入(每个谓词输入与谓词相对应(。但是,如果将其作为输入懒惰序列,那就更好了。然后,命名并行命名或pany(对于"并行任何"(,我们可以打电话如下:

  • (pany (map (comp eval list) predicates inputs))
  • (pany (map (comp eval list) predicates (repeat input)))
  • (pany (map (comp eval list) (repeat predicate) inputs))等效于(pany (map predicate (unchunk inputs)))

作为最后一句话,我认为索要pany,双pall或一种构建这种早期终止平行减少的机制是很自然的语言等语言。

我将根据还原功能定义我们的谓词。实际上,我们可以重新实现所有Clojure迭代功能以支持此并行操作,但我将以REDY为例。

我将定义一个计算函数。我只使用同一件,但没有什么可以阻止您拥有很多。该函数是" true",如果它累积了1000。

(defn computor [acc val]
        (let [new (+' acc val)] (if (> new 1000) (reduced new) new)))
(reduce computor 0 (range))
;; =>
1035
(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation
;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)] 
                 [computor 0 (range Long/MIN_VALUE 0)]])

现在让我们去做这个肉。我想在每个计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,一次使用PMAP一次步骤非常慢 - 工作单位太小,不值得穿线。在这里,我更改了要进行1000个工作单位的迭代,然后再继续进行。您可能会根据您的工作量和一步的成本来调整此内容。

(defn p-or-reducer* [reductions]
        (let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))

我然后将其包裹在驱动程序中。

(defn p-or [s]
  (p-or-reducer* (map #(apply reductions %) s)))
(p-or predicates)
;; =>
1035

在哪里插入CPU并行性?S/MAP/PMAP/在P-OR-REDUCER中*应该这样做。我建议只是平行于第一个操作,因为这将驱动还原序列计算。

(defn p-or-reducer* [reductions]
        (let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
              complete (some #(if (empty? (second %)) (last (first %))) splits)]
          (or complete (recur (map second splits)))))
(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
                             [computor 0 (range)]))
(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not

很难定义此的性能通用版本。在不知道每次迭代的成本的情况下,很难得出有效的并行性策略 - 如果一个迭代需要10次,那么我们可能一次采取一步。如果需要100n,那么我们需要一次采取许多步骤。

您会考虑采用core.async使用async/goasync/thread处理并行任务,并使用async/alts!

提早返回?

例如,将核心or函数从串行变为并行。我们可以创建一个宏(我称为por(,以将输入功能(或谓词(包装到async/thread中,然后在其中进行插座选择async/alts!

(defmacro por [& fns]
  `(let [[v# c#] (async/alts!!
                  [~@(for [f fns]
                       (list `async/thread f))])]
     v#))
(time
 (por (do (println "running a") (Thread/sleep 30) :a)
      (do (println "running b") (Thread/sleep 20) :b)
      (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; running b
;; running c
;; "Elapsed time: 11.919169 msecs"
;; => :c

与原始or(以串行运行(相比:

(time
 (or (do (println "running a") (Thread/sleep 30) :a)
     (do (println "running b") (Thread/sleep 20) :b)
     (do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; => :a
;; "Elapsed time: 31.642506 msecs"

最新更新