为什么我的Mapreduce实现(现实世界的haskell)使用迭代IO也失败了"Too many open files"



我正在实现一个haskell程序,它将文件中的每一行与文件中的其他行进行比较。可以实现单线程,如下所示

distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

这将在O(n^2)时间内运行,并且必须在整个时间内将完整的整数列表保存在内存中。在我的实际程序中,这一行包含更多的数字,我从中构造了一个比Int稍微复杂的数据类型。这导致我必须处理的数据出现内存不足错误。

所以对于上面提到的单线程解决方案有两个改进要做。第一,加快实际运行时间。其次,找到一种不将整个列表一直保存在内存中的方法。我知道这需要解析整个文件n次。因此将会有O(n^2)次比较,并解析O(n^2)行。这对我来说没问题,因为我宁愿有一个缓慢成功的程序,也不愿有一个失败的程序。当输入文件足够小时,我总是可以使用更简单的版本。

为了使用多个cpu内核,我从真实世界的Haskell(第24章,可在这里获得)中提取了Mapreduce实现。

我修改了书的分块函数,不再将整个文件分成块,而是返回与行一样多的块,每个块代表

的一个元素。
tails . lines . readFile

因为我希望程序在文件大小上也是可伸缩的,所以我最初使用了惰性IO。然而,这在"打开的文件太多"的情况下失败了,我在前面的问题中问过这个问题(文件句柄被GC处理得太晚了)。完整的延迟IO版本张贴在那里。

正如公认的答案所解释的,strict IO可以解决这个问题。这确实解决了"打开的文件太多"的问题。2k行文件出现问题,但显示"内存不足"失败;在一个50k的文件上

请注意,第一个单线程实现(没有mapreduce)能够处理50k的文件。 另一种解决方案,也是最吸引我的,是使用iteratee IO。我希望这可以解决文件句柄和内存资源耗尽问题。然而,我的实现仍然失败了"太多打开的文件"2k行文件错误。

iteratee IO版本具有与书中相同的mapReduce函数,但修改了chunkedFileEnum以使其与枚举器一起工作。

所以我的问题是;下面的迭代IO基实现有什么问题?懒惰在哪里?

import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
        where infiniteList :: Int->Int-> [Int]
              infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file 
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
                  fileContents <- readFile path
                  return $ allDistances $ map read $ lines $ fileContents
                  where
                      allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x)    xs)
                      allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
                            where transformer input = case reader input of
                                         Right (val, remainder) -> return [val]
                                         Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
    maybeNum <- EL.head
    maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
    maybeNum <- EL.head
    maybe (return 0) combineNextDistance maybeNum
    where combineNextDistance nextNum = do
              rest <- distancesOneToManyIt base
              return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
          -> (a -> b)   -- map function
          -> Strategy c -- evaluation strategy for reduction
          -> ([b] -> c) -- reduce function
          -> [a]        -- list to map over
          -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
          mapResult `pseq` reduceResult
          where mapResult    = parMap mapStrat mapFunc input
                reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
                                      rpar (sumValuesAsReduceFunc)
                            where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
                                  runEnumeratorAsMapFunc = (source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
                                  sumValuesAsReduceFunc :: [IO Int] -> IO Int
                                  sumValuesAsReduceFunc = liftM sum . sequence
                              
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
    chunkOffset :: !Int
    , chunkLength :: !Int
    } deriving (Eq,Show)
chunkedFileEnum ::   (NFData (a)) => MonadIO m =>
                (FilePath-> IO [ChunkSpec])
           ->   ([Enumerator Text m b]->IO a)
           ->   FilePath
           ->   IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedEnum chunkCreator path
    r <- funcOnChunks chunks
    (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum ::  MonadIO m=>
                (FilePath -> IO [ChunkSpec])
            ->  FilePath
            ->  IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ spec -> do
        h <- openFile path ReadMode
        hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
        let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
        return (chunk,h)
-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Str.hGet h 256
                        case Str.elemIndex 'n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Str.length bytes)
            findNewline newOffset
        findChunks 0
顺便说一句,我在跑步HaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)
使用以下包:
字节串0.9.1.10
并行3.1.0.1
枚举数0.4.8,这里有一个手册

如错误提示,打开的文件太多。我期望Haskell按顺序运行大部分程序,但有些"火花"是并行的。然而,正如sclv所提到的,Haskell总是会激发评估。

这在纯函数式程序中通常不是问题,但在处理IO(资源)时就会出现问题。我把《真实世界Haskell》一书中描述的并行度放大得太大了。所以我的结论是,在处理spark中的IO资源时,只在有限的规模上进行并行。在纯函数部分,过度并行可能成功。

因此,我的帖子的答案是,不要在整个程序中使用MapReduce,而是在内部纯功能部分使用MapReduce。

为了显示程序实际失败的地方,我使用——enable-executable-profiling -p配置它,构建它,并使用+RTS -p -hc -L30运行它。因为可执行文件立即失败,所以没有内存分配配置文件。生成的.prof文件中的时间分配概要文件以以下内容开头:

                                                                                               individual    inherited
COST CENTRE              MODULE                                               no.    entries  %time %alloc   %time %alloc
MAIN                     MAIN                                                   1            0   0.0    0.3   100.0  100.0
  main                    Main                                                1648           2   0.0    0.0    50.0   98.9
    sumOfDistancesOnFileWithIt MapReduceTest                                  1649           1   0.0    0.0    50.0   98.9
      chunkedFileEnum       MapReduceTest                                     1650           1   0.0    0.0    50.0   98.9
        chunkedEnum          MapReduceTest                                    1651         495   0.0   24.2    50.0   98.9
          lineOffsets         MapReduceTest                                   1652           1  50.0   74.6    50.0   74.6

chunkedEnum返回IO ([Enumerator Text m b], [Handle]),显然接收到495个条目。输入文件是一个2k行的文件,因此lineOffsets上的单个条目返回一个包含2000个偏移量的列表。在distanceusingmapreduceit中没有一个条目,所以实际的工作甚至没有开始!

相关内容

  • 没有找到相关文章

最新更新