我想定义一个谓词,作为输入某些谓词使用相应的输入(可以作为懒惰的呼叫序列给出(,并行运行它们,并计算逻辑或结果,以这样的方式,当谓词呼叫终止返回true
时,整个计算也终止(返回true
(。
除了提供时间优化之外,这也将有助于避免在某些情况下,非终止(某些谓词调用可能不会终止(。实际上,将非终端解释为第三个undefined
值,这个谓词模拟了克莱恩的K3逻辑中的或操作(在初始中心的kleene代数中加入(。
这里为Haskell家族提供了类似的东西。有什么(最好是简单(在Clojure中执行此操作的方法?
编辑:我决定在阅读评论后添加一些澄清。
(a(首先,线程池耗尽后发生的事情不那么重要。我认为创建一个足够大的线程池是一个合理的惯例。
(b(最关键的要求是谓词调用并行运行,一旦谓词调用终止返回true
,所有其他的线程都会中断。预期的行为是:
- 如果有一个谓词调用返回
true
:并行或返回true
- 否则如果有一个未终止的谓词调用:平行或不终止
- 其他:并行或返回
false
换句话说,它的行为就像false
&lt给出的3元素晶格中的连接一样。undefined
<true
,用undefined
代表非终止。
(c(并行或应能够作为输入许多谓词和许多谓词输入(每个谓词输入与谓词相对应(。但是,如果将其作为输入懒惰序列,那就更好了。然后,命名并行命名或pany
(对于"并行任何"(,我们可以打电话如下:
-
(pany (map (comp eval list) predicates inputs))
-
(pany (map (comp eval list) predicates (repeat input)))
-
(pany (map (comp eval list) (repeat predicate) inputs))
等效于(pany (map predicate (unchunk inputs)))
作为最后一句话,我认为索要pany
,双pall
或一种构建这种早期终止平行减少的机制是很自然的语言等语言。
我将根据还原功能定义我们的谓词。实际上,我们可以重新实现所有Clojure迭代功能以支持此并行操作,但我将以REDY为例。
我将定义一个计算函数。我只使用同一件,但没有什么可以阻止您拥有很多。该函数是" true",如果它累积了1000。
(defn computor [acc val]
(let [new (+' acc val)] (if (> new 1000) (reduced new) new)))
(reduce computor 0 (range))
;; =>
1035
(reduce computor 0 (range Long/MIN_VALUE 0))
;; =>
;; ...this is a proxy for a non-returning computation
;; wrap these up in a form suitable for application of reduction
(def predicates [[computor 0 (range)]
[computor 0 (range Long/MIN_VALUE 0)]])
现在让我们去做这个肉。我想在每个计算中迈出一步,如果其中一个计算完成,我想返回它。实际上,一次使用PMAP一次步骤非常慢 - 工作单位太小,不值得穿线。在这里,我更改了要进行1000个工作单位的迭代,然后再继续进行。您可能会根据您的工作量和一步的成本来调整此内容。
(defn p-or-reducer* [reductions]
(let [splits (map #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
我然后将其包裹在驱动程序中。
(defn p-or [s]
(p-or-reducer* (map #(apply reductions %) s)))
(p-or predicates)
;; =>
1035
在哪里插入CPU并行性?S/MAP/PMAP/在P-OR-REDUCER中*应该这样做。我建议只是平行于第一个操作,因为这将驱动还原序列计算。
(defn p-or-reducer* [reductions]
(let [splits (pmap #(split-at 1000 %) reductions) ;; do at least 1000 iterations per cycle
complete (some #(if (empty? (second %)) (last (first %))) splits)]
(or complete (recur (map second splits)))))
(def parallelism-tester (conj (vec (repeat 40000 [computor 0 (range Long/MIN_VALUE 0)]))
[computor 0 (range)]))
(p-or parallelism-tester) ;; terminates even though the first 40K predicates will not
很难定义此的性能通用版本。在不知道每次迭代的成本的情况下,很难得出有效的并行性策略 - 如果一个迭代需要10次,那么我们可能一次采取一步。如果需要100n,那么我们需要一次采取许多步骤。
您会考虑采用core.async
使用async/go
或async/thread
处理并行任务,并使用async/alts!
?
例如,将核心or
函数从串行变为并行。我们可以创建一个宏(我称为por
(,以将输入功能(或谓词(包装到async/thread
中,然后在其中进行插座选择async/alts!
:
(defmacro por [& fns]
`(let [[v# c#] (async/alts!!
[~@(for [f fns]
(list `async/thread f))])]
v#))
(time
(por (do (println "running a") (Thread/sleep 30) :a)
(do (println "running b") (Thread/sleep 20) :b)
(do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; running b
;; running c
;; "Elapsed time: 11.919169 msecs"
;; => :c
与原始or
(以串行运行(相比:
(time
(or (do (println "running a") (Thread/sleep 30) :a)
(do (println "running b") (Thread/sleep 20) :b)
(do (println "running c") (Thread/sleep 10) :c)))
;; running a
;; => :a
;; "Elapsed time: 31.642506 msecs"