2015-06-04 5 views
3

С помощью SparkR я пытаюсь создать PoC для создания RDD, который я создал из текстовых файлов, который содержит около 4M строк.Сбой при сбое SparkR с OutOfMemory на куче Java Java

Ядро Spark работает в облаке Google, используется bdutil и состоит из 1 мастера и 2 рабочих с 15 ГБ оперативной памяти и 4 ядра каждый. Мой репозиторий HDFS основан на Google Storage с gcs-connector 1.4.0. SparkR запущен на каждой машине, а базовые тесты работают с небольшими файлами.

Вот скрипт я использую:

Sys.setenv("SPARK_MEM" = "1g") 
sc <- sparkR.init("spark://xxxx:7077", sparkEnvir=list(spark.executor.memory="1g")) 
lines <- textFile(sc, "gs://xxxx/dir/") 
test <- collect(lines) 

Первый раз, когда я запускаю это, кажется, работает нормально, все задачи выполняются успешно, щ искры из говорит, что работа завершена, но я никогда не получаю R подскажет назад:

15/06/04 13:36:59 WARN SparkConf: Setting 'spark.executor.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around. 
15/06/04 13:36:59 WARN SparkConf: Setting 'spark.driver.extraClassPath' to ':/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar' as a work-around. 
15/06/04 13:36:59 INFO Slf4jLogger: Slf4jLogger started 
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT 
15/06/04 13:37:00 INFO AbstractConnector: Started [email protected]:52439 
15/06/04 13:37:00 INFO Server: jetty-8.y.z-SNAPSHOT 
15/06/04 13:37:00 INFO AbstractConnector: Started [email protected]:4040 

15/06/04 13:37:54 INFO GoogleHadoopFileSystemBase: GHFS version: 1.4.0-hadoop1 
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library is available 
15/06/04 13:37:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
15/06/04 13:37:55 WARN LoadSnappy: Snappy native library not loaded 
15/06/04 13:37:55 INFO FileInputFormat: Total input paths to process : 68 
[Stage 0:=======================================================>                      (27 + 10)/68] 

Затем после CTRL-C, чтобы получить R подскажет назад, я пытаюсь снова запустить оплаченный метод, вот результат:

[Stage 1:==========================================================>                     (28 + 9)/68]15/06/04 13:42:08 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.remote.default-remote-dispatcher-5] shutting down ActorSystem [sparkDriver] 
java.lang.OutOfMemoryError: Java heap space 
     at org.spark_project.protobuf.ByteString.toByteArray(ByteString.java:515) 
     at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:64) 
     at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) 
     at scala.util.Try$.apply(Try.scala:161) 
     at akka.serialization.Serialization.deserialize(Serialization.scala:98) 
     at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) 
     at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) 
     at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) 
     at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) 
     at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) 
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
     at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) 
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
     at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
     at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Я понимаю сообщение об исключении, но я не понимаю, почему я получаю это во второй раз. Кроме того, почему сборник никогда не возвращается после завершения в Spark?

Я искал каждую информацию, которую у меня есть, но мне не повезло найти решение. Любая помощь или подсказка будут очень признательны!

Благодаря

+0

Я не знаю об искровом сценарии, но контекст искры должен быть близок к возврату. – Tinku

+0

Спасибо за ваш ответ. Это интерактивный режим, так что это нормально, что я не закрываю контекст здесь. Это похоже на искровую оболочку. – Gouffe

+0

Насколько велика ваша 4M-строка? –

ответ

1

Это действительно кажется простой комбинацией Java в памяти объекта представления неэффективным в сочетании с некоторыми очевидными ссылками долгоживущих объектов, которые вызывают некоторые коллекции, чтобы не быть сборщиком мусора во времени для новый вызов collect() для перезаписывания старого на месте.

Я экспериментировал с некоторыми параметрами, и для моего образца файла размером 256 МБ, который содержит строки ~ 4M, я действительно воспроизвожу ваше поведение, когда сбор в порядке в первый раз, но OOM во второй раз при использовании SPARK_MEM=1g. Затем я устанавливаю SPARK_MEM=4g, а затем я могу ctrl + c и повторно запускать test <- collect(lines) столько раз, сколько хочу.

С одной стороны, даже если ссылки не просачивается, обратите внимание, что после того, как в первый раз вы запускали test <- collect(lines), переменная test держит этот гигантский массив строк, и второй раз при вызове его, collect(lines)выполняет до того, наконец, присваивается переменной test и, таким образом, при любом простом заказе команды, нет способа утилизировать мусор, чтобы собрать старое содержимое test. Это означает, что второй запуск заставит процесс SparkRBackend одновременно удерживать две копии всей коллекции, что приведет к обнаружению OOM.

Для диагностики, на мастер я начал SparkR и первый побежал

[email protected]:~$ jps | grep SparkRBackend 
8709 SparkRBackend 

Я также проверил top и он использует около 22MB памяти. Я притащил профиль кучи с jmap:

jmap -heap:format=b 8709 
mv heap.bin heap0.bin 

Тогда я побежал первый раунд test <- collect(lines) в какой момент запуска top показал его с помощью ~ 1,7 г памяти RES. Я схватил еще одну кучу кучи.Наконец, я также попробовал test <- {}, чтобы избавиться от ссылок, чтобы разрешить сбор мусора. Сделав это и распечатав test и показывая, что он пуст, я схватил еще одну кучу кучи и заметил, что RES все еще показал 1.7g. Я использовал jhat heap0.bin для анализа оригинального дампа кучи, и получил:

Heap Histogram 

All Classes (excluding platform) 

Class Instance Count Total Size 
class [B 25126 14174163 
class [C 19183 1576884 
class [<other> 11841 1067424 
class [Lscala.concurrent.forkjoin.ForkJoinTask; 16 1048832 
class [I 1524 769384 
... 

После запуска собирать, у меня было:

Heap Histogram 

All Classes (excluding platform) 

Class Instance Count Total Size 
class [C 2784858 579458804 
class [B 27768 70519801 
class java.lang.String 2782732 44523712 
class [Ljava.lang.Object; 2567 22380840 
class [I 1538 8460152 
class [Lscala.concurrent.forkjoin.ForkJoinTask; 27 1769904 

Даже после того, как я обнулен test, она осталась примерно такой же. Это показывает нам 2784858 экземпляров char [], для общего размера 579 МБ, а также 2782732 экземпляров String, предположительно содержащих эти char [] над ним. Я следил за ссылочным графиком до конца и получил что-то вроде

char [] -> String -> String [] -> ... -> class scala.collection.mutable.DefaultEntry -> class [Lscala. collection.mutable.HashEntry; -> класс scala.collection.mutable.HashMap -> класс edu.berkeley.cs.amplab.sparkr.JVMObjectTracker $ -> [email protected] (36 bytes) -> [email protected] (138 bytes)

И тогда AppClassLoader имел нечто вроде тысяч входящих ссылок. Поэтому где-то в этой цепочке что-то должно было удалить их ссылку, но не удалось сделать это, в результате чего весь собранный массив сидел в памяти, пока мы пытаемся извлечь вторую копию.

Наконец, чтобы ответить на ваш вопрос о подвесе после collect, похоже, что он связан с данными, не соответствующими памяти процесса R; вот нить, связанная с этой проблемой: https://www.mail-archive.com/[email protected]/msg29155.html

Я подтвердил, что использование меньшего файла только с несколькими строками, а затем запуск collect действительно не виснет.

+0

Привет, Деннис, еще раз спасибо за помощь. Я займусь этим, и я вернусь к вам как можно скорее! – Gouffe

+0

Я не очень хорошо разбираюсь в инструментах анализа памяти Java, я должен был сам это сделать. Спасибо за это! Так что вы говорите, что есть ошибка, которая мешает сборке мусора в какой-то момент, даже если явным образом задал переменную ничего? (просто чтобы убедиться, что понял) Спасибо! – Gouffe

+0

Это тонкий, так что это может быть серая область относительно того, будет ли это считаться опасной ошибкой; Я проверил, что после завершения нового сбора() исходные результаты collect() в конечном итоге будут собраны в мусор, поэтому, если на этом втором вызове нет OOM, память, по-видимому, не продолжает течь в дальнейшем. –