我试图解析一个有一百万行的文件,每行都是一个json字符串,其中包含一些关于一本书的信息(作者、内容等)。我正在使用iota加载文件,因为如果我尝试使用slurp
,我的程序会抛出一个OutOfMemoryError
。我还使用cheshire来解析字符串。该程序只需加载一个文件并统计所有书籍中的所有单词。
我的第一次尝试包括pmap
来做繁重的工作,我认为这基本上会利用我所有的cpu核心。
(ns multicore-parsing.core
(:require [cheshire.core :as json]
[iota :as io]
[clojure.string :as string]
[clojure.core.reducers :as r]))
(defn words-pmap
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"s+"))]
(->>
(io/vec filename)
(pmap parse-with-keywords)
(pmap words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
虽然它似乎使用了所有核心,但每个核心很少使用超过50%的容量,我的猜测是这与pmap的批量大小有关,所以我偶然发现了一个相对古老的问题,其中一些评论引用了clojure.core.reducers
库。
我决定用reducers/map
:重写函数
(defn words-reducers
[filename]
(letfn [(parse-with-keywords [str]
(json/parse-string str true))
(words [book]
(string/split (:contents book) #"s+"))]
(->>
(io/vec filename)
(r/map parse-with-keywords)
(r/map words)
(r/reduce #(apply conj %1 %2) #{})
(count))))
但cpu的使用情况更糟,与之前的实现相比,完成甚至需要更长的时间
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 20899.088919 msecs"
546
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 28790.976455 msecs"
546
我做错了什么?解析大文件时,mmap loading+reducers是正确的方法吗?
编辑:这是我正在使用的文件。
编辑2:以下是iota/seq
而非iota/vec
:的计时
multicore-parsing.core=> (time (words-reducers "./dummy_data.txt"))
"Elapsed time: 160981.224565 msecs"
546
multicore-parsing.core=> (time (words-pmap "./dummy_data.txt"))
"Elapsed time: 160296.482722 msecs"
546
我不认为归约器是适合您的解决方案,因为它们根本无法很好地处理懒惰序列(归约器会用懒惰序列给出正确的结果,但不会很好地并行)。
你可能想看看《七周内的七个并发模型》一书中的这个示例代码(免责声明:我是作者),它解决了一个类似的问题(计算每个单词在维基百科上出现的次数)。
给定一个维基百科页面列表,此函数按顺序计算单词(get-words
返回页面中的单词序列):
(defn count-words-sequential [pages]
(frequencies (mapcat get-words pages)))
这是一个使用pmap
的并行版本,运行速度更快,但仅快1.5倍左右:
(defn count-words-parallel [pages]
(reduce (partial merge-with +)
(pmap #(frequencies (get-words %)) pages)))
它只快1.5倍的原因是reduce
成为了一个瓶颈——它为每个页面调用(partial merge-with +)
一次。在4核机器上,合并100页的批次可将性能提高到3.2倍左右:
(defn count-words [pages]
(reduce (partial merge-with +)
(pmap count-words-sequential (partition-all 100 pages))))