Утечка памяти в рекурсивной функции ввода-вывода - PAP

Я написал библиотеку под названием amqp-worker, которая предоставляет функцию под названием worker, которая опросает очередь сообщений (например, RabbitMQ) для сообщений, вызывая обработчик при обнаружении сообщения. Затем он возвращается к опросу.

Это утечка памяти. Я профилировал его, и график говорит, что PAP (приложение частичной функции) является виновником. Где утечка в моем коде? Как я могу избежать утечек при циклировании в IO с помощью forever?

введите описание изображения здесь

Вот некоторые соответствующие функции. Полный источник здесь.

Пример программы. Это утечка

main :: IO ()
main = do
  -- connect
  conn <- Worker.connect (fromURI "amqp://guest:[email protected]:5672")

  -- initialize the queues
  Worker.initQueue conn queue
  Worker.initQueue conn results

  -- publish a message
  Worker.publish conn queue (TestMessage "hello world")

  -- create a worker, the program loops here
  Worker.worker def conn queue onError (onMessage conn)

worker

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m ()) -> (Message a -> m ()) -> m ()
worker opts conn queue onError action =
  forever $ do
    eres <- consumeNext (pollDelay opts) conn queue
    case eres of
      Error (ParseError reason bd) ->
        onError (MessageParseError bd reason)

      Parsed msg ->
        catch
          (action msg)
          (onError . OtherException (body msg))
    liftBase $ threadDelay (loopDelay opts)

consumeNext

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg)
consumeNext pd conn queue =
    poll pd $ consume conn queue

poll

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a
poll us action = do
    ma <- action
    case ma of
      Just a -> return a
      Nothing -> do
        liftBase $ threadDelay us
        poll us action

Ответы

Ответ 1

Вот очень простой пример, демонстрирующий вашу проблему:

main :: IO ()
main = worker

{-# NOINLINE worker #-}
worker :: (Monad m) => m ()
worker =
  let loop = poll >> loop
  in loop

poll :: (Monad m) => m a
poll = return () >> poll
If you remove the `NOINLINE`, or specialize `m` to
`IO` (while compiling with `-O`), the leak goes away.

Я написал подробный блог сообщение о том, почему именно этот код утечки памяти. Краткое изложение, как указывает Рейд в своем ответ, что код создает и запоминает цепочку частичных приложений >> с.

Я также подал ghc ticket об этом.

Ответ 2

Может быть, проще понять, что этот

main :: IO ()
main = let c = count 0
       in c >> c

{-# NOINLINE count #-}
count :: Monad m => Int -> m ()
count 1000000 = return ()
count n = return () >> count (n+1)

Оценка f >> g для операций ввода-вывода приводит к некоторому закрытию, которое имеет ссылки как на f, так и на g (это в основном состав f и g как функции на токенах состояния). count 0 возвращает thunk c, который будет оценивать большую структуру замыканий формы return () >> return () >> return () >> .... Когда мы выполняем c, мы создаем эту структуру, и поскольку мы должны выполнить c во второй раз, вся структура остается в живых. Таким образом, эта программа утечки памяти (независимо от флагов оптимизации).

Когда count специализируется на IO, и оптимизация включена, GHC предлагает множество трюков, чтобы избежать создания этой структуры данных; но все они полагаются на то, что монада IO.

Возвращаясь к исходному count :: Monad m => Int -> m (), мы можем попытаться избежать создания этой большой структуры, изменив последнюю строку на

count n = return () >>= (\_ -> count (n+1))

Теперь рекурсивный вызов скрыт внутри лямбда, поэтому c представляет собой небольшую структуру return () >>= (\_ -> BODY). Это фактически предотвращает утечку пространства при компиляции без оптимизации. Однако, когда оптимизация включена, GHC плавает count (n+1) из тела лямбда (поскольку он не зависит от аргумента), создавая

count n = return () >>= (let body = count (n+1) in \_ -> body)

и теперь c снова является большой структурой...

Ответ 3

Утечка памяти находилась в poll. Используя monad-loops, я изменил определение на следующее: похоже, что untilJust выполняет то же самое, что и моя рекурсия, но исправляет протечь.

Может кто-нибудь комментировать, почему мое предыдущее определение poll было утечкой памяти?

{-# LANGUAGE FlexibleContexts #-}

module Network.AMQP.Worker.Poll where

import Control.Concurrent (threadDelay)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad.Base (liftBase)
import Control.Monad.Loops (untilJust)

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a
poll us action = untilJust $ do
    ma <- action
    case ma of
      Just a -> return $ Just a
      Nothing -> do
        liftBase $ threadDelay us
        return Nothing