Я пытаюсь использовать пакет scalaz iteratee для обработки большого zip-файла в постоянном пространстве. У меня есть длительный процесс, который мне нужно выполнить для каждого файла в zip-файле. Эти процессы могут (и должны) выполняться параллельно.Scalaz 7 Iteratee для обработки большого zip-файла (OutOfMemoryError)
Я создал EnumeratorT
, который надувает каждый ZipEntry
в объект File
. Подпись выглядит следующим образом:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
Я хочу приложить IterateeT
, которая будет выполнять длительный процесс по каждому файлу. Я в основном в конечном итоге что-то вроде:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
Когда я пытаюсь запустить его:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
Я получаю java.lang.OutOfMemoryError: Java heap space
сообщений. Это имеет смысл для меня, поскольку он пытается создать массивный список в память обо всех этих объектах IO
и Promise
.
Несколько вопросов:
- Кто-нибудь есть какие-либо идеи о том, как избежать этого? Похоже, что я неправильно подхожу к проблеме, потому что я действительно забочусь только об
longRunningProcess
за ее побочные эффекты. - Подходит ли подход
Enumerator
к неправильному подходу?
У меня довольно много идей, поэтому все поможет.
Спасибо!
Update # 1
Вот трассировки стека:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
Я в настоящее время совет nadavwr, чтобы убедиться, что все действует, как я думаю, что это. Я буду сообщать о любых обновлениях.
Update # 2
Используя идеи из обоих ответов ниже, я нашел достойное решение. Как предположил huynhjl (и я проверил использование предложения nadavwr по анализу дампа кучи), consume
заставляло каждый завышенный ZipEntry
удерживаться в памяти, поэтому процесс заканчивался из памяти. Я изменил consume
на foldM
и обновил длительный процесс, чтобы просто вернуть Promise[IOE[Unit]]
вместо ссылки на файл. Таким образом, у меня есть коллекция всех IoExceptions в конце. Вот рабочее решение:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
Это решение раздувает каждую запись при асинхронной загрузке. В конце концов, у меня есть огромный список выполненных объектов Promise
, содержащих какие-либо ошибки. Я до сих пор не полностью убежден, что это правильное использование Iteratee, но теперь у меня есть несколько многоразовых, составных частей, которые я могу использовать в других частях нашей системы (это очень распространенная картина для нас).
Спасибо за вашу помощь!
Что делает длительный процесс? Вычисляет ли это что-то из zip-контента? – huynhjl
Каждый файл в zip-файле - это изображение. Длительный процесс загружает этот файл в Rackspace CloudFiles. Как только я это выясню, мне нужно будет добавить дополнительные процессы, которые изменят размер изображений, а затем загрузите их. –
Iteratees чувствует себя как неправильная абстракция для этой работы, поскольку вы хотите распараллелить рабочую нагрузку. Думаю, актеры будут работать лучше. – huynhjl