2016-03-08 3 views
0

Вот фрагмент из более крупного кода, который я использую, чтобы прочитать блок данных из Parquet в Scala.Различное поведение при чтении из Паркета в автономной/мастер-рабской искровой раковине

case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double]) 

def buildMatrix(cooMatrixFields: DataFrame) = { 

    val cooMatrices = cooMatrixFields map { 
    case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]]) 
    } 

    val matEntries = cooMatrices.zipWithIndex.flatMap { 
    case (cooMat, matIndex) => 
     val rowOffset = cooMat.row.distinct.size 
     val colOffset = cooMat.col.distinct.size 

     val cooMatRowShifted = cooMat.row.map(rowEntry => rowEntry + rowOffset * matIndex) 
     val cooMatColShifted = cooMat.col.map(colEntry => colEntry + colOffset * matIndex) 

     (cooMatRowShifted, cooMatColShifted, cooMat.data).zipped.map { 
     case (i, j, value) => MatrixEntry(i, j, value) 
     } 
    } 

    new CoordinateMatrix(matEntries) 
} 


val C_entries = sqlContext.read.load(s"${dataBaseDir}/C.parquet") 

val C = buildMatrix(C_entries) 

Мой код выполняет успешно при работе в локальном контексте искры.

В автономном кластере тот же самый код выходит из строя, как только он достигает действия, которое заставляет его фактически читать из Паркета. Схема dataframe в извлекаются правильно:

C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>] 

Но крах исполнителей при выполнении этой строки val C = buildMatrix(C_entries), с этим исключением:

java.lang.ExceptionInInitializerError 
    at $line39.$read$$iwC.<init>(<console>:7) 
    at $line39.$read.<init>(<console>:61) 
    at $line39.$read$.<init>(<console>:65) 
    at $line39.$read$.<clinit>(<console>) 
    at $line67.$read$$iwC.<init>(<console>:7) 
    at $line67.$read.<init>(<console>:24) 
    at $line67.$read$.<init>(<console>:28) 
    at $line67.$read$.<clinit>(<console>) 
    at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:63) 
    at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:62) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597) 
    at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) 
    at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.NullPointerException 
    at $line4.$read$$iwC$$iwC.<init>(<console>:15) 
    at $line4.$read$$iwC.<init>(<console>:24) 
    at $line4.$read.<init>(<console>:26) 
    at $line4.$read$.<init>(<console>:30) 
    at $line4.$read$.<clinit>(<console>) 
    ... 22 more 

Не уверен, что это связанно, но при увеличении многословия журнала, я» ве заметил это исключение:

16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms 
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862 
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class 

Я пробовал различные конфигурации для автономного кластера:

  • мастер, 1 ведомого устройства и искровые-оболочка работает на моем ноутбуке
  • мастер и 1 подчиненный каждое из которых работает на отдельных машинах, искровым оболочки на моем ноутбуке
  • мастер и искровым оболочки на одной машине, 1 ведомый на другой один

Я начал со свойствами по умолчанию и эволюционировал в более замысловатые свойствах файл без большего успеха:

spark.driver.memory    4g 
spark.rpc=netty 
spark.eventLog.enabled    true 
spark.eventLog.dir     file:///mnt/fastmp/spark_workdir/logs 
spark.driver.extraJavaOptions  -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m 
spark.shuffle.service.enabled  true 
spark.shuffle.consolidateFiles  true 
spark.sql.parquet.binaryAsString true 
spark.speculation     false 
spark.rpc.timeout     1000 
spark.rdd.compress true 
spark.core.connection.ack.wait.timeout 600 
spark.driver.maxResultSize   0 
spark.task.maxFailures    3 
spark.shuffle.io.maxRetries  3 

Я запускаю предварительно построенную версию spark-1.6.0-bin-hadoop2.6. В этом развертывании нет HDFS, все файлы Parquet хранятся на общем оборудовании (CephFS), доступном для всех компьютеров.

Я сомневаюсь, что это связано с базовой файловой системой, так как другая часть моего кода читает отличный файл Parquet как в локальном, так и в автономном режиме.

+0

Итак, если вы вызываете, например, 'C_entries.count' сразу после чтения файла паркета' val C_entries = sqlContext.read.load (s "$ {dataBaseDir} /C.parquet") 'он также терпит неудачу? –

+0

да, я могу. Я даже могу сделать это после того, как аварийные исполнители восстановились. Я думаю, что я помню, что читал что-то об этом, что Спарку не нужно было читать весь файл паркета, чтобы получить количество строк в фреймворке данных, поэтому нет ничего удивительного в том, что он работает. – jopasserat

+0

Хорошо - я не знаю, что Spark может опустить выполнение действия, но это не значит, что это не так;) Я просто хотел разобраться, связана ли проблема с чтением файла паркета (возможно, коррумпированным файл или нулевые значения, где вы их не ожидаете), или если проблема связана с фактическим кодом. Поэтому, если вы считаете, что 'count' может ввести в заблуждение, тогда выполните некоторую тривиальную« карту », за которой следует, скажем,« reduceByKey », а затем подсчитайте. Это _has_ для оценки всего файла паркета при вызове 'reduceByKey'. Если этот простой пример работает, вы узнаете, что он не имеет ничего общего с паркетным файлом. –

ответ

0

TL; DR: пакет кода в качестве баночке

для записи цели, проблема, казалось, связана с использованием автономного кластера.

Точно такой же код прекрасно работает с этими настройками:

  • искровой оболочкой и мастером на ту же машине
  • работает на ПРЯЖАХ (AWS ОГО кластера) и читают паркетных файлы из S3

с немного больше копаться в логах автономной установки, проблема, как представляется, связана с этим исключением с классом сервера:

INFO GenerateUnsafeProjection: Code generated in 157.285464 ms 
DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862 
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class 

Я понимаю, что spark-shell запускает HTTP-сервер (jetty), чтобы обслуживать классы, которые он генерирует из кода в REPL для рабочих.

В моем случае много классов обслуживаются успешно (мне даже удалось получить некоторые данные через telnet). Однако класс GeneratedClass (и все его внутренние классы) не может быть найден сервером классов.

Типичное сообщение об ошибке появляется в журнале является:

DEBUG Server: RESPONSE /org/apache/spark/sql/catalyst/expressions/GeneratedClass.class 404 handled=true 

Моя идея заключается в том, что она работает с мастером и искровым оболочки на том же сервере, что они работают в одной и той же виртуальной машины Java, так что класс может быть найден даже если передача HTTP не удалась.

Единственное удачное решение я нашел до сих пор является строить банку пакет и использовать --jars вариант spark-shell или передать его в качестве параметра spark-submit.