Запуск параллельных загрузок URL-адресов в Haskell
Ниже приведен код Haskell, который (HTTP) загружает файлы, отсутствующие в данном каталоге:
module Main where
import Control.Monad ( filterM
, liftM
)
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
, rspBody
, simpleHTTP
)
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
, hPutStr
, hPutStrLn
, IOMode(WriteMode)
, openFile
, stderr
)
import Text.Printf ( printf )
indices :: [String]
indices =
map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
where
format1 index =
printf "%d-%d" ((index * 1000 + 1) :: Int)
(((index + 1) * 1000) :: Int)
format2 index =
printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
((10000 + (2 * index + 2) * 1000) :: Int)
main :: IO ()
main = do
[dir] <- getArgs
updateDownloads dir
updateDownloads :: FilePath -> IO ()
updateDownloads path = do
let
fileNames = map (\index ->
(index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
missing <-
filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
mapM_ (\(index, fileName) -> do
let
url =
"http://en.wiktionary.org/wiki/Wiktionary:Frequency_lists/TV/2006/" ++
index
request =
Request
{ rqURI = fromJust $ parseURI url
, rqMethod = GET
, rqHeaders = []
, rqBody = ""
}
hPutStrLn stderr $ "Downloading " ++ show url
resp <- simpleHTTP request
case resp of
Left _ -> hPutStrLn stderr $ "Error connecting to " ++ show url
Right response -> do
let
html = rspBody response
file <- openFile fileName WriteMode
hPutStr file html
hClose file
return ()) missing
Я хотел бы запускать загрузки параллельно. Я знаю о par
, но не уверен, что его можно использовать в монаде IO
, и если да, то как?
ОБНОВЛЕНИЕ: Вот мой код, переопределенный с помощью Control.Concurrent.Async
и mapConcurrently
:
module Main where
import Control.Concurrent.Async ( mapConcurrently )
import Control.Monad ( filterM
, liftM
)
import Data.Maybe ( fromJust )
import Network.HTTP ( RequestMethod(GET)
, rspBody
, simpleHTTP
)
import Network.HTTP.Base ( Request(..) )
import Network.URI ( parseURI )
import System.Directory ( doesFileExist )
import System.Environment ( getArgs )
import System.IO ( hClose
, hPutStr
, hPutStrLn
, IOMode(WriteMode)
, openFile
, stderr
)
import Text.Printf ( printf )
indices :: [String]
indices =
map format1 [0..9] ++ map format2 [0..14] ++ ["40001-41284" :: String]
where
format1 index =
printf "%d-%d" ((index * 1000 + 1) :: Int)
(((index + 1) * 1000) :: Int)
format2 index =
printf "%d-%d" ((10000 + 2 * index * 1000 + 1) :: Int)
((10000 + (2 * index + 2) * 1000) :: Int)
main :: IO ()
main = do
[dir] <- getArgs
updateDownloads dir
updateDownloads :: FilePath -> IO ()
updateDownloads path = do
let
fileNames = map (\index ->
(index, path ++ "/tv_and_movie_freqlist" ++ index ++ ".html")) indices
missing <-
filterM (\(_, fileName) -> liftM not $ doesFileExist fileName) fileNames
pages <-
mapConcurrently (\(index, fileName) -> getUrl index fileName) missing
mapM_ (\(fileName, html) -> do
handle <- openFile fileName WriteMode
hPutStr handle html
hClose handle) pages
where
getUrl :: String -> FilePath -> IO (FilePath, String)
getUrl index fileName = do
let
url =
"http://en.wiktionary.org/wiki/Wiktionary:Frequency_lists/TV/2006/" ++
index
request =
Request
{ rqURI = fromJust $ parseURI url
, rqMethod = GET
, rqHeaders = []
, rqBody = ""
}
resp <- simpleHTTP request
case resp of
Left _ -> do
hPutStrLn stderr $ "Error connecting to " ++ show url
return ("", "")
Right response ->
return (fileName, rspBody response)
Ответы
Ответ 1
Посмотрите mapConcurrently
из библиотеки "async" Саймона Марлоу.
Он сопоставляет действие IO
параллельно и асинхронно с элементами контейнера Traversable
и ждет всех действий.
Пример:
{-# LANGUAGE PackageImports #-}
import System.Environment (getArgs)
import "async" Control.Concurrent.Async (mapConcurrently)
import "HTTP" Network.HTTP
import "HTTP" Network.Stream (Result)
import "HTTP" Network.HTTP.Base (Response(..))
import System.IO
import "url" Network.URL (encString)
import Control.Monad
getURL :: String -> IO (String, Result (Response String))
getURL url = do
res <- (simpleHTTP . getRequest) url
return (url, res)
main = do
args <- getArgs
case args of
[] -> putStrLn "usage: program url1 url2 ... urlN"
args -> do
results <- mapConcurrently getURL args
forM_ results $ \(url, res) -> do
case res of
Left connError -> putStrLn $ url ++ "; " ++ show connError
Right response -> do
putStrLn $ url ++ "; OK"
let content = rspBody response
-- make name from url
fname = encString True (`notElem` ":/") url ++ ".html"
writeFile fname content
Ответ 2
Это похоже на то, что async
предназначено, на самом деле примером является параллельная загрузка. Также есть презентация - http://skillsmatter.com/podcast/home/high-performance-concurrency - стоит проверить.
Ответ 3
Поскольку операции связаны с IO, для этого обычно используется /not/use par
, поскольку он ничего не делает для операций ввода-вывода.
Вам понадобится явная модель concurrency, чтобы скрыть задержку загрузки.
Я бы порекомендовал MVars или TVars в сочетании с forkIO.
Аббревиатура рабочей очереди часто полезна для этого стиля проблемы: нажимайте все URL-адреса в очередь и фиксируйте набор рабочих потоков (например, N * k) для N ядер, выполняйте задания до завершения. Затем завершенные работы будут добавлены к каналу связи, переданному в основной поток.
Вот пример из параллельной проверки URL, используя каналы.
http://code.haskell.org/~dons/code/urlcheck/Check.hs
Ответ 4
Другая версия, которая использует async mapConcurrently и http-conduit keep- живой менеджер
{-# LANGUAGE PackageImports, FlexibleContexts #-}
import System.Environment (getArgs)
import "http-conduit" Network.HTTP.Conduit
import qualified "conduit" Data.Conduit as C
import "http-types" Network.HTTP.Types.Status (ok200)
import "async" Control.Concurrent.Async (mapConcurrently)
import qualified "bytestring" Data.ByteString.Lazy as LBS
import qualified "bytestring" Data.ByteString as BS
import "transformers" Control.Monad.Trans.Class (lift)
import "transformers" Control.Monad.IO.Class (liftIO)
import "url" Network.URL (encString)
import "failure" Control.Failure (Failure(..))
import Control.Monad
import System.IO
taggedRequest :: Failure HttpException m => String -> m (String, Request m')
taggedRequest url = do
req <- parseUrl url
return (url, req)
taggedResult :: (C.MonadBaseControl IO m, C.MonadResource m) => Manager -> (String, Request m) -> m (String, Response LBS.ByteString)
taggedResult manager (url, req) = do
res <- httpLbs req manager
return (url, res)
main = do
args <- getArgs
case args of
[] -> putStrLn "usage: program url1 url2 ... urlN"
args -> do
requests <- mapM taggedRequest args
withManager $ \manager -> liftIO $ do
results <- mapConcurrently (C.runResourceT . taggedResult manager) requests
forM_ results $ \(url, Response status _ _ bsBody) -> do
putStrLn $ url ++ " ; " ++ show status
let fileName = encString True (`notElem` ":/") url ++ ".html"
when (status == ok200) $ LBS.writeFile fileName bsBody