Haskell:优化图形处理算法



这是这篇文章的后续,现在的代码基于结构化深度优先搜索算法在Haskell中进行深度优先搜索,由King和Launchbury在20世纪90年代完成。那篇论文提出了一个生成和修剪策略,但使用了一个带有State Monad的可变数组(我怀疑有些语法已经被弃用了)。作者暗示,一个集合可以用来记住访问过的节点,作为额外的O(log n)的代价。我尝试用一个集合来实现(我们现在有比20世纪90年代更好的机器!),使用现代的State Monad语法,并使用向量而不是数组(我读到这通常更好)。

和以前一样,我的代码运行在小数据集上,但未能返回我需要分析的5m边缘图,我正在寻找提示,仅关于规模操作的弱点。我所知道的是,代码在内存中运行舒适,所以这不是问题,但我是否无意中滑到了O(n2)?(通过对比,本文在数据上正式实施。图库(我最近也借了一些代码)使用一个可变数组,但失败的大数据集与…堆栈溢出! !)

所以现在我有一个矢量数据存储与IntSet状态,不完成和一个数组与ST Monad数组"官方"一个崩溃!Haskell应该能做得比这更好?

import Data.Vector (Vector)
import qualified Data.IntSet as IS
import qualified Data.Vector as V
import qualified Data.ByteString.Char8 as BS
import Control.Monad.State
type Vertex   = Int
type Table a  = Vector a
type Graph    = Table [Vertex]
type Edge     = (Vertex, Vertex)
data Tree a   = Node a (Forest a) deriving (Show,Eq)
type Forest a = [Tree a]
-- ghc -O2 -threaded --make
-- +RTS -Nx
generate :: Graph -> Vertex -> Tree Vertex
generate g v = Node v $ map (generate g) (g V.! v)
chop :: Forest Vertex -> State IS.IntSet (Forest Vertex)
chop [] = return []
chop (Node x ts:us) = do
    visited <- contains x
    if visited then
        chop us
    else do
        include x
        x1 <- chop ts
        x2 <- chop us
        return (Node x x1:x2)
prune :: Forest Vertex -> State IS.IntSet (Forest Vertex)
prune vs = chop vs
main = do
    --edges <- V.fromList `fmap` getEdges "testdata.txt"
    edges <- V.fromList `fmap` getEdges "SCC.txt"
    let 
        -- calculate size of five largest SCC
        maxIndex = fst $ V.last edges
        gr = buildG maxIndex edges
        sccRes = scc gr
        big5 = take 5 sccRes
        big5' = map (l -> length $ postorder l) big5
    putStrLn $ show $ big5'
contains :: Vertex -> State IS.IntSet Bool
contains v = state $ visited -> (v `IS.member` visited, visited)
include :: Vertex -> State IS.IntSet ()
include v = state $ visited -> ((), IS.insert v visited)

getEdges :: String -> IO [Edge]
getEdges path = do
    lines <- (map BS.words . BS.lines) `fmap` BS.readFile path
    let pairs = (map . map) (maybe (error "can't read Int") fst . BS.readInt) lines
    return [(a, b) | [a, b] <- pairs] 
vertices :: Graph -> [Vertex]
vertices gr = [1.. (V.length gr - 1)]
edges :: Graph -> [Edge]
edges g = [(u,v) | u <- vertices g, v <- g V.! u]
-- accumulate :: (a -> b -> a)  -> Vector a-> Vector (Int, b)--> Vector a
-- accumulating function f
-- initial vector (of length m)
-- vector of index/value pairs (of length n)
buildG :: Int -> Table Edge -> Graph
buildG maxIndex edges = graph' where
    graph    = V.replicate (maxIndex + 1) []
    --graph'   = V.accumulate (existing new -> new:existing) graph edges
    -- flip f takes its (first) two arguments in the reverse order of f
    graph'   = V.accumulate (flip (:)) graph edges
mapT :: Ord a => (Vertex -> a -> b) -> Table a -> Table b
mapT = V.imap
outDegree :: Graph -> Table Int
outDegree g = mapT numEdges g
    where numEdges v es = length es
indegree :: Graph -> Table Int
indegree g = outDegree $ transposeG g
transposeG :: Graph -> Graph
transposeG g = buildG (V.length g - 1) (reverseE g)
reverseE :: Graph -> Table Edge
reverseE g = V.fromList [(w, v) | (v,w) <- edges g]
-- --------------------------------------------------------------
postorder :: Tree a -> [a]
postorder (Node a ts) = postorderF ts ++ [a]
postorderF :: Forest a -> [a]
postorderF ts = concat (map postorder ts)
postOrd :: Graph -> [Vertex]
postOrd g = postorderF (dff g)
dfs :: Graph -> [Vertex] -> Forest Vertex
dfs g vs = map (generate g) vs
dfs' :: Graph -> [Vertex] -> Forest Vertex
dfs' g vs = fst $ runState (prune d) $ IS.fromList []
    where d = dfs g vs
dff :: Graph -> Forest Vertex
dff g = dfs' g $ reverse (vertices g)
scc :: Graph -> Forest Vertex
scc g = dfs' g $ reverse $ postOrd (transposeG g)

一些小改进:

改变
type Edge = (Vertex, Vertex)

data Edge = Edge {-# UNPACK #-} !Vertex {-# UNPACK #-} !Vertex

重用从7个字到3个字的每条边的内存使用情况,并提高缓存局域性。减少内存压力几乎总是可以改善运行时。正如@jberryman提到的,可以为Table Edge使用一个未装箱的向量(然后你不需要上面的自定义数据类型)。

generate :: Graph -> Vertex -> Tree Vertex
generate g v = Node v $ map (generate g) (g V.! v)

如果您确定索引在边界内,您可以使用vector中的不安全索引函数而不是.!

contains :: Vertex -> State IS.IntSet Bool
contains v = state $ visited -> (v `IS.member` visited, visited)

使用getput $!的组合。

include :: Vertex -> State IS.IntSet ()
include v = state $ visited -> ((), IS.insert v visited)

modify'代替。

你在程序中使用了很多列表。链表并不是内存/缓存效率最高的数据结构。查看是否可以将代码转换为使用更多向量。

最新更新