2016-12-23 2 views
16

Я написал библиотеку под названием amqp-worker, которая предоставляет функцию с именем worker, которая опросает очередь сообщений (например, RabbitMQ) для сообщений, вызывая обработчик при обнаружении сообщения. Затем он возвращается к опросу.Утечка памяти в рекурсивной функции ввода-вывода - PAP

Это утечка памяти. Я профилировал его, и график говорит, что виновником является PAP (приложение частичной функции). Где утечка в моем коде? Как я могу избежать утечек при зацикливании в IO с forever?

enter image description here

Вот некоторые важные функции. The full source is here.

Example Program. Эта утечка

main :: IO() 
main = do 
    -- connect 
    conn <- Worker.connect (fromURI "amqp://guest:[email protected]:5672") 

    -- initialize the queues 
    Worker.initQueue conn queue 
    Worker.initQueue conn results 

    -- publish a message 
    Worker.publish conn queue (TestMessage "hello world") 

    -- create a worker, the program loops here 
    Worker.worker def conn queue onError (onMessage conn) 

worker

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m()) -> (Message a -> m()) -> m() 
worker opts conn queue onError action = 
    forever $ do 
    eres <- consumeNext (pollDelay opts) conn queue 
    case eres of 
     Error (ParseError reason bd) -> 
     onError (MessageParseError bd reason) 

     Parsed msg -> 
     catch 
      (action msg) 
      (onError . OtherException (body msg)) 
    liftBase $ threadDelay (loopDelay opts) 

consumeNext

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg) 
consumeNext pd conn queue = 
    poll pd $ consume conn queue 

poll

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = do 
    ma <- action 
    case ma of 
     Just a -> return a 
     Nothing -> do 
     liftBase $ threadDelay us 
     poll us action 
+0

Что ваша версия ghc и как вы компилируете? – jberryman

+1

Он установлен в lts-7.3, так что GHC 8.0.1. Я собираюсь с установкой стека --profile. Но я получаю утечку памяти с нормальной установкой стека. Использование параметров ghc по умолчанию из шаблона стека: -threaded -rtsopts -with-rtsopts = -N –

+2

Этот пример очень далек от минимального - вы импортируете всю свою библиотеку ('Network.AMQP.Worker') в свою примерную программу. Как бы то ни было, это слишком широко. – user2407038

ответ

14

Вот очень простой пример, который демонстрирует вашу проблему:

main :: IO() 
main = worker 

{-# NOINLINE worker #-} 
worker :: (Monad m) => m() 
worker = 
    let loop = poll >> loop 
    in loop 

poll :: (Monad m) => m a 
poll = return() >> poll 
If you remove the `NOINLINE`, or specialize `m` to 
`IO` (while compiling with `-O`), the leak goes away. 

Я написал подробный blog post о том, почему именно этот код утечки памяти. Краткое резюме, как Рейд указывает в своем ответе , что код создает и запоминает цепочку частичных приложений >> s.

Я также подал ghc ticket об этом.

3

Утечка памяти была в poll. Используя monad-loops, я изменил определение на следующее: похоже, что untilJust делает то же самое, что и моя рекурсия, но исправляет утечку.

Может кто-нибудь комментировать, почему мое предыдущее определение poll было утечкой памяти?

{-# LANGUAGE FlexibleContexts #-} 

module Network.AMQP.Worker.Poll where 

import Control.Concurrent (threadDelay) 
import Control.Monad.Trans.Control (MonadBaseControl) 
import Control.Monad.Base (liftBase) 
import Control.Monad.Loops (untilJust) 

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = untilJust $ do 
    ma <- action 
    case ma of 
     Just a -> return $ Just a 
     Nothing -> do 
     liftBase $ threadDelay us 
     return Nothing 
4

Может быть, проще пример, чтобы понять это один

main :: IO() 
main = let c = count 0 
     in c >> c 

{-# NOINLINE count #-} 
count :: Monad m => Int -> m() 
count 1000000 = return() 
count n = return() >> count (n+1) 

Оценка f >> g для действий ввода-вывода Урожайность своего рода замыканию, что имеет ссылки на обоих f и g (это в основном состав f и g как функции на государственные жетоны). count 0 возвращает thunk c, который будет оценивать большую структуру закрытий формы return() >> return() >> return() >> .... Когда мы выполняем c, мы создаем эту структуру, и поскольку мы должны выполнить c второй раз, вся структура все еще жива. Таким образом, эта программа утечки памяти (независимо от флагов оптимизации).

Когда count специализируется на IO, и оптимизация включена, GHC предлагает множество трюков, чтобы избежать создания этой структуры данных; но все они полагаются на то, что монада IO.

Возвращаясь к исходной count :: Monad m => Int -> m(), мы можем попытаться не строить эту большую структуру, изменив последнюю строку на

count n = return() >>= (\_ -> count (n+1)) 

Теперь рекурсивный вызов скрыт внутри лямбды, так c лишь небольшая структура return() >>= (\_ -> BODY) , Это фактически предотвращает утечку пространства при компиляции без оптимизации. Однако при оптимизации включены, GHC выплывает count (n+1) из тела лямбды (так как он не зависит от аргумента) производящего

count n = return() >>= (let body = count (n+1) in \_ -> body) 

и в настоящее время c большая структура снова ...

+0

Каким образом использование «NOINLINE» делает программу сравнимой с исходной негерметичной? – Michael

+0

GHC, не встроенный или специализирующий, является общим случаем (когда функция определена в другом модуле, а не маленьком и т. Д.). GHC знает много трюков, и когда вы минимизируете эти трюки, можете ударить. Использование 'NOINLINE' останавливает многие из этих трюки позволяют сэкономить время. –