2013-04-22 10 views
10

Рассмотрите этот код (взято из here и модифицировано для использования байтов, а не строк символов).Как использовать IO с Scalaz7 Iteratees без переполнения стека?

import java.io.{ File, InputStream, BufferedInputStream, FileInputStream } 
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ } 
import std.list._ 

object IterateeIOExample { 
    type ErrorOr[+A] = EitherT[IO, Throwable, A] 

    def openStream(f: File) = IO(new BufferedInputStream(new FileInputStream(f))) 
    def readByte(s: InputStream) = IO(Some(s.read()).filter(_ != -1)) 
    def closeStream(s: InputStream) = IO(s.close()) 

    def tryIO[A, B](action: IO[B]) = I.iterateeT[A, ErrorOr, B] { 
    EitherT(action.catchLeft).map(r => I.sdone(r, I.emptyInput)) 
    } 

    def enumBuffered(r: => BufferedInputStream) = new EnumeratorT[Int, ErrorOr] { 
    lazy val reader = r 
    def apply[A] = (s: StepT[Int, ErrorOr, A]) => s.mapCont(k => 
     tryIO(readByte(reader)) flatMap { 
     case None => s.pointI 
     case Some(byte) => k(I.elInput(byte)) >>== apply[A] 
     }) 
    } 

    def enumFile(f: File) = new EnumeratorT[Int, ErrorOr] { 
    def apply[A] = (s: StepT[Int, ErrorOr, A]) => 
     tryIO(openStream(f)).flatMap(stream => I.iterateeT[Int, ErrorOr, A](
     EitherT(
      enumBuffered(stream).apply(s).value.run.ensuring(closeStream(stream))))) 
    } 

    def main(args: Array[String]) { 
    val action = (
     I.consume[Int, ErrorOr, List] &= 
     enumFile(new File(args(0)))).run.run 
    println(action.unsafePerformIO()) 
    } 
} 

Выполнение этого кода на приличном размере файла (8 КБ) производит StackOverflowException. Некоторые поиски показали, что исключение можно избежать, используя монадию Trampoline вместо IO, но это не похоже на отличное решение - пожертвовать функциональной чистотой, чтобы программа полностью завершилась. Очевидный способ исправить это - использовать IO или Trampoline в качестве Monad Transformer для переноса другого, но я не могу найти реализацию трансформаторной версии любого из них, и мне не хватает гуру функционального программирования для знаю, как писать свои собственные (более подробная информация о FP является одной из целей этого проекта, но я подозреваю, что создание новых монадных трансформаторов немного выше моего уровня на данный момент). Я полагаю, что я мог бы просто включить большое действие IO вокруг создания, запуска и возврата результата моих итераций, но это похоже на более обходное решение, чем решение.

Предположительно некоторые монады не могут быть преобразованы в трансформаторы монады, поэтому я хотел бы знать, можно ли работать с большими файлами без сброса ввода-вывода или переполнения стека, и если да, то как?

Бонусный вопрос: Я не могу придумать никакого способа для итерации, чтобы сигнализировать о том, что во время обработки он столкнулся с ошибкой при обработке, за исключением того, чтобы вернуть его. Или это упрощает их компоновку. В приведенном выше коде показано, как использовать EitherT для обработки ошибок в перечислителе, но как это работает для итераций?

+0

Это может быть полезно вам: http://termsandtruthconditions.herokuapp.com/blog/2013/03/16/free-monad/ – Impredicative

+0

Это хорошее объяснение того, почему мне нужно использовать Trampoline, чтобы избежать переполнения стека, но он не охватывает, как использовать как IO, так и Trampoline. – Redattack34

+0

IO уже батут. – Apocalisp

ответ

3

После создания исключений и печати их длины стека в разных местах вашего кода я чувствовал, что ваш код не переполнен. Кажется, что все работает в постоянном размере стека. Поэтому я искал другие места. В конце концов, я скопировал реализацию consume и добавил некоторую печать глубины стека и подтвердил, что она переполнена там.

Так что это перетекает:

(I.consume[Int, Id, List] &= EnumeratorT.enumStream(Stream.fill(10000)(1))).run 

Но, потом я узнал, что это не делает:

(I.putStrTo[Int](System.out) &= EnumeratorT.enumStream(Stream.fill(10000)(1))) 
    .run.unsafePerformIO() 

putStrTo использует foldM и почему-то не вызывает переполнение. Поэтому мне интересно, может ли быть реализовано consume с точки зрения foldM. Я только что скопировал несколько вещей из потребляемого и измененного до его составления:

def consume1[E, F[_]:Monad, A[_]:PlusEmpty:Applicative]: IterateeT[E, F, A[E]] = { 
    I.foldM[E, F, A[E]](PlusEmpty[A].empty){ (acc: A[E], e: E) => 
    (Applicative[A].point(e) <+> acc).point[F] 
    } 
} 

И это сработало! Печать длинного списка int.

+0

Кажется, что «потребляет1» переполняет Scalaz 7.0.3, по крайней мере для меня. Получаете ли вы тот же результат, если вы увеличите размер потока? Я пытаюсь отследить [потенциально связанную ошибку] ​​(https: // github.com/scalaz/scalaz/issues/554) - Я заметил, что я получаю переполнение стека, если я запускаю контекст «Id», в то время как я получаю ошибку кучи пространства, если я запускаю в «Trampoline». Однако в вашем случае ошибка исчезает в бамбуковом контексте, что заставляет меня подозревать, что проблемы могут быть не связаны друг с другом ... –

+0

@AaronNovstrup, он по-прежнему работает с 100000 и scalaz 7.0.3, поэтому может быть ваша проблема действительно отличается. – huynhjl

+0

Странно. Я вижу переполнение стека с 'consume1' в консоли Scala даже для относительно небольшого количества элементов (100), используя Scala 2.10.2, Scalaz 7.0.3, 64-разрядный сервер OpenJDK VM 1.7.0_25 и размер стека 256k. –

 Смежные вопросы

  • Нет связанных вопросов^_^