Почему моя реализация Mapreduce (реальный мир haskell) с использованием iteratee IO также терпит неудачу с "Слишком много открытых файлов",
Я реализую программу haskell, которая сравнивает каждую строку файла с каждой другой линией в файле. Который может быть реализован с одиночной резьбой следующим образом
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
Это будет работать в O (n ^ 2) времени и должно содержать полный список целых чисел в памяти все время. В моей реальной программе строка содержит больше чисел, из которых я строю немного сложный тип данных, чем Int. Это привело к ошибкам памяти для данных, которые я должен обработать.
Таким образом, есть два улучшения в вышеупомянутом однопоточном решении. Во-первых, ускорьте фактическое время работы. Во-вторых, найдите способ не хранить весь список в памяти на полный рабочий день. Я знаю, что это требует разбора полного файла n раз. Таким образом, будет проведено сравнение O (n ^ 2) и проанализированы строки O (n ^ 2). Это нормально для меня, поскольку я предпочел бы медленную успешную программу, чем неудачная программа. Когда входной файл достаточно мал, я всегда могу найти более простую версию.
Чтобы использовать несколько ядер процессора, я взял реализацию Mapreduce из Real World Haskell (глава 24, доступная здесь).
Я изменил функцию chunking из книги, вместо того, чтобы делить полный файл на куски, вернуть столько кусков, сколько строк с каждым фрагментом, представляющим один элемент
tails . lines . readFile
Поскольку я хочу, чтобы программа также была масштабируемой в размере файла, я изначально использовал lazy IO. Однако это приводит к сбою "Слишком много открытых файлов" , о чем я спросил в предыдущем вопросе (файлы обрабатывались слишком поздно GC). Полная ленивая версия IO размещена там.
Как объясняет принятый ответ, строгий IO может решить проблему. Это действительно решает проблему "Слишком много открытых файлов" для файлов строк 2k, но не получается "из памяти" в файле 50k.
Обратите внимание, что первая реализация однопоточная (без mapreduce) способна обрабатывать файл 50k.
Альтернативное решение, которое мне больше всего нравится, заключается в использовании iteratee IO. Я ожидал, что это решит как дескриптор файла, так и ресурс памяти. Однако моя реализация все еще не выполняется с ошибкой "Слишком много открытых файлов" в файле строки 2k.
Итерационная версия IO имеет ту же функцию mapReduce, что и в книге, но имеет модифицированный chunkedFileEnum, позволяющий ей работать с Enumerator.
Таким образом, мой вопрос: что не так со следующей итерационной базой IO? Где Лень?.
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But i want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS{
chunkOffset :: !Int
, chunkLength :: !Int
} deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
Btw, я бегу
HaskellPlatform 2011.2.0 на Mac OS X 10.6.7 (снежный барс)
со следующими пакетами:
bytestring 0.9.1.10
параллельный 3.1.0.1
перечислитель 0.4.8, с руководством здесь
Ответы
Ответ 1
Как говорится в ошибке, слишком много открытых файлов. Я ожидал, что Haskell будет запускать большую часть программы последовательно, но некоторые "искры" параллельны. Однако, как упоминалось выше, Haskell всегда искривляет оценки.
Это обычно не проблема в чистой функциональной программе, но это когда вы работаете с IO (ресурсами). Я масштабировал parallelism, как описано в книге Real World Haskell слишком далеко. Поэтому мой вывод состоит в том, чтобы сделать parallelism только в ограниченном масштабе при работе с ресурсами ввода-вывода в искрых. В чистой функциональной части может произойти чрезмерное parallelism.
Таким образом, ответ на мой пост - не использовать MapReduce для всей программы, а внутри внутренней чистой функциональной части.
Чтобы показать, где программа действительно не удалось, я настроил ее с помощью -enable-executable-profiling -p, построил ее и запустил с помощью + RTS -p -hc -L30. Поскольку исполняемый файл не работает немедленно, профиль распределения памяти отсутствует. Результирующий профиль распределения времени в файле .prof начинается со следующего:
individual inherited
COST CENTRE MODULE no. entries %time %alloc %time %alloc
MAIN MAIN 1 0 0.0 0.3 100.0 100.0
main Main 1648 2 0.0 0.0 50.0 98.9
sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9
chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9
chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9
lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6
chunkedEnum возвращает IO ([Enumerator Text m b], [Handle]) и, по-видимому, получает 495 записей. Входной файл был файлом строки 2k, поэтому единственная запись в lineOffsets вернула список смещений 2000. В distancesUsingMapReduceIt нет ни одной записи, поэтому фактическая работа даже не началась!