优化大型向量的操作



这是我之前关于处理5.1m边有向图的矢量表示的问题的后续问题。我正试图实现Kosaraju的图形算法,因此需要按照深度优先搜索(DFS)的完成时间顺序重新排列我的向量。我有代码在小数据集上运行,但不能在10分钟内返回完整的数据集。(我不能排除从大图中产生循环,但在我的测试数据中没有这种迹象。)

DFS需要避免重访节点,所以我需要某种"状态"来进行搜索(目前是一个元组,我应该使用state Monad吗?)。第一次搜索应该返回一个重新排序的Vector,但我现在通过返回一个重新排序的Node索引列表来保持事情的简单,以便我可以随后一次处理Vector。

我认为问题出在dfsInner。下面的代码"记住"访问的节点,更新每个节点的已探索字段(第三个守卫)。尽管我试图使其尾部递归,但代码的内存使用量似乎增长得相当快。我需要强制执行一些严格的,如果是,如何?(我有另一个版本,我在单次搜索中使用,它通过查看堆栈上未探索边的开始节点和已完成的节点列表来检查以前的访问。它不会快速增长,但对于任何连接良好的节点都不会返回。)

然而,它也可能是foldr',但是我如何检测到?

这应该是Coursera的作业,但我不再确定我可以勾选荣誉代码按钮!学习更重要,所以我不想要一个复制/粘贴的答案。我所拥有的并不是很优雅——它也有一种命令式的感觉,这是由保持某种状态的问题驱动的——参见第三个警卫。欢迎对设计模式提出意见。

type NodeName = Int
type Edges    = [NodeName]
type Explored = Bool
type Stack    = [(Int, Int)]
data Node  = Node NodeName Explored Edges Edges deriving (Eq, Show)
type Graph = Vector Node
main = do
    edges <- V.fromList `fmap` getEdges "SCC.txt"
    let 
        maxIndex = fst $ V.last edges
        gr = createGraph maxIndex edges
        res = dfsOuter gr
    --return gr
    putStrLn $ show res
dfsOuter gr = 
    let tmp = V.foldr' callInner (gr,[]) gr
    in snd tmp
callInner :: Node -> (Graph, Stack) -> (Graph, Stack)
callInner (Node idx _ fwd bwd) (gr,acc) = 
    let (Node _ explored _ _) = gr V.! idx
    in case explored of
        True  -> (gr, acc)
        False ->
            let
                initialStack = map (l -> (idx, l)) bwd
                gr' = gr V.// [(idx, Node idx True fwd bwd)]
                (gr'', newScc) = dfsInner idx initialStack (length acc) (gr', [])
            in (gr'', newScc++acc)
dfsInner :: NodeName -> Stack -> Int -> (Graph, [(Int, Int)]) -> (Graph, [(Int, Int)])
dfsInner start [] finishCounter (gr, acc) = (gr, (start, finishCounter):acc)
dfsInner start stack finishCounter (gr, acc)
    | nextStart /= start =                      -- no more places to go from this node
        dfsInner nextStart stack (finishCounter + 1) $ (gr, (start, finishCounter):acc)
    | nextExplored = 
-- nextExplored || any ((y,_) -> y == stack0Head) stack || any ((x,_) -> x == stack0Head) acc =
        dfsInner start (tail stack) finishCounter (gr, acc)
    | otherwise =
        dfsInner nextEnd (add2Stack++stack) finishCounter (gr V.// [(nextEnd, Node idx True nextLHS nextRHS)], acc)
--      dfsInner gr stack0Head (add2Stack++stack) finishCounter acc
    where
        (nextStart, nextEnd) = head stack
        (Node idx nextExplored nextLHS nextRHS) = gr V.! nextEnd
        add2Stack = map (l -> (nextEnd, l)) nextRHS

一句话:

了解时间复杂度

有很多优化点,其中很大一部分在日常编程中不是很重要,但如果不了解渐近复杂性,程序通常会根本不起作用

Haskell库通常记录复杂性,特别是当它不明显或不有效(线性或更糟)时。特别是,与这个问题有关的所有复杂性都可以在Data.ListData.Vector中找到。

性能在这里被V.//杀死。向量是内存中已装箱或未装箱的不可变连续数组。因此,修改它们需要复制整个向量。因为我们有O(N)个这样的修改,所以整个算法是O(N ^2),所以我们必须复制大约2tb, N = 500000。因此,在向量内标记访问节点并没有多大用处。相反,根据需要构建索引的IntSet

initialStack (length acc)看起来也很糟糕。在大列表上使用length几乎从来都不是一个好主意,因为它也是O(n)。它可能不像代码中的//那么糟糕,因为它位于一个相对很少发生的分支中,但在我们纠正了向量问题之后,它仍然会使性能受损。

而且,搜索实现对我来说似乎相当不清楚和过于复杂。将Wiki页面上的伪代码翻译成字面意思应该是一个好的开始。此外,没有必要将索引存储在节点中,因为它们可以从向量位置和邻接表中确定。

根据@andras的要点,我重写了我的代码如下:我没有使用箭头函数,因为我不熟悉它们,我的第二次深度搜索在风格上与第一次相同(而不是@Andras filterM方法)。最终的结果是,它完成的时间是Andras代码的20%(21秒而不是114秒)。

import qualified Data.Vector as V
import qualified Data.IntSet as IS
import qualified Data.ByteString.Char8 as BS
import Data.List
import Control.Monad
import Control.Monad.State
--import Criterion.Main
--getEdges :: String -> IO [(Int, Int)]
getEdges file = do
    lines <- (map BS.words . BS.lines) `fmap` BS.readFile file
    let 
        pairs = (map . map) (maybe (error "can't read Int") fst . BS.readInt) lines
        pairs' = [(a, b) | [a, b] <- pairs]         -- adds 9 seconds
        maxIndex = fst $ last pairs'
        graph = createGraph maxIndex pairs'
    return graph
main = do
    graph <- getEdges "SCC.txt"
    --let 
        --maxIndex = fst $ V.last edges
    let 
        fts = bwdLoop graph
        leaders = fst $ execState (fwdLoop graph fts) ([], IS.empty)
    print $ length leaders
type Connections = [Int]
data Node = Node {fwd, bwd :: Connections} deriving (Show)
type Graph = V.Vector Node
type Visited = IS.IntSet
type FinishTime = Int
type FinishTimes = [FinishTime]
type Leaders = [Int]
createGraph :: Int -> [(Int, Int)] -> Graph
createGraph maxIndex pairs = 
    let
        graph  = V.replicate (maxIndex+1) (Node [] [])
        graph' = V.accum ((Node f b) x -> Node (x:f) b) graph  pairs
    in           V.accum ((Node f b) x -> Node f (x:b)) graph' $ map ((a,b) -> (b,a)) pairs
bwdLoop :: Graph -> FinishTimes
bwdLoop g = fst $ execState (mapM_ go $ reverse [0 .. V.length g - 1]) ([], IS.empty) where
    go :: Int -> State (FinishTimes, Visited) ()
    go i = do
        (fTimes, vs) <- get
        let visited = IS.member i vs
        if not visited then do
            put (fTimes, IS.insert i vs)
            mapM_ go $ bwd $ g V.! i
            -- get state again after changes from mapM_
            (fTimes', vs') <- get
            put (i : fTimes', vs')
        else return ()
fwdLoop :: Graph -> FinishTimes -> State (Leaders, Visited) ()
fwdLoop _ [] = return ()
fwdLoop g (i:fts) = do
    (ls, vs) <- get
    let visited = IS.member i vs
    if not visited then do
        put (i:ls, IS.insert i vs)
        mapM_ go $ fwd $ g V.! i
    else return ()
    fwdLoop g fts
    where
        go :: Int -> State (Leaders, Visited) ()
        go i = do
            (ls, vs) <- get
            let visited = IS.member i vs
            if not visited then do
                put (ls, IS.insert i vs)
                mapM_ go $ fwd $ g V.! i
            else return ()

最新更新