Вот фрагмент из более крупного кода, который я использую, чтобы прочитать блок данных из Parquet в Scala.Различное поведение при чтении из Паркета в автономной/мастер-рабской искровой раковине
case class COOMatrix(row: Seq[Long], col: Seq[Long], data: Seq[Double])
def buildMatrix(cooMatrixFields: DataFrame) = {
val cooMatrices = cooMatrixFields map {
case Row(r,c,d) => COOMatrix(r.asInstanceOf[Seq[Long]], c.asInstanceOf[Seq[Long]], d.asInstanceOf[Seq[Double]])
}
val matEntries = cooMatrices.zipWithIndex.flatMap {
case (cooMat, matIndex) =>
val rowOffset = cooMat.row.distinct.size
val colOffset = cooMat.col.distinct.size
val cooMatRowShifted = cooMat.row.map(rowEntry => rowEntry + rowOffset * matIndex)
val cooMatColShifted = cooMat.col.map(colEntry => colEntry + colOffset * matIndex)
(cooMatRowShifted, cooMatColShifted, cooMat.data).zipped.map {
case (i, j, value) => MatrixEntry(i, j, value)
}
}
new CoordinateMatrix(matEntries)
}
val C_entries = sqlContext.read.load(s"${dataBaseDir}/C.parquet")
val C = buildMatrix(C_entries)
Мой код выполняет успешно при работе в локальном контексте искры.
В автономном кластере тот же самый код выходит из строя, как только он достигает действия, которое заставляет его фактически читать из Паркета. Схема dataframe в извлекаются правильно:
C_entries: org.apache.spark.sql.DataFrame = [C_row: array<bigint>, C_col: array<bigint>, C_data: array<double>]
Но крах исполнителей при выполнении этой строки val C = buildMatrix(C_entries)
, с этим исключением:
java.lang.ExceptionInInitializerError
at $line39.$read$$iwC.<init>(<console>:7)
at $line39.$read.<init>(<console>:61)
at $line39.$read$.<init>(<console>:65)
at $line39.$read$.<clinit>(<console>)
at $line67.$read$$iwC.<init>(<console>:7)
at $line67.$read.<init>(<console>:24)
at $line67.$read$.<init>(<console>:28)
at $line67.$read$.<clinit>(<console>)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:63)
at $line68.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$3.apply(<console>:62)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1597)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.rdd.ZippedWithIndexRDD$$anonfun$2.apply(ZippedWithIndexRDD.scala:52)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at $line4.$read$$iwC$$iwC.<init>(<console>:15)
at $line4.$read$$iwC.<init>(<console>:24)
at $line4.$read.<init>(<console>:26)
at $line4.$read$.<init>(<console>:30)
at $line4.$read$.<clinit>(<console>)
... 22 more
Не уверен, что это связанно, но при увеличении многословия журнала, я» ве заметил это исключение:
16/03/07 20:59:38 INFO GenerateUnsafeProjection: Code generated in 157.285464 ms
16/03/07 20:59:38 DEBUG ExecutorClassLoader: Did not load class org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection from REPL class server at http://155.198.193.158:32862
java.lang.ClassNotFoundException: Class file not found at URL http://155.198.193.158:32862/org/apache/spark/sql/catalyst/expressions/GeneratedClass%24SpecificUnsafeProjection.class
Я пробовал различные конфигурации для автономного кластера:
- мастер, 1 ведомого устройства и искровые-оболочка работает на моем ноутбуке
- мастер и 1 подчиненный каждое из которых работает на отдельных машинах, искровым оболочки на моем ноутбуке
- мастер и искровым оболочки на одной машине, 1 ведомый на другой один
Я начал со свойствами по умолчанию и эволюционировал в более замысловатые свойствах файл без большего успеха:
spark.driver.memory 4g
spark.rpc=netty
spark.eventLog.enabled true
spark.eventLog.dir file:///mnt/fastmp/spark_workdir/logs
spark.driver.extraJavaOptions -Xmx20480m -XX:MaxPermSize=2048m -XX:ReservedCodeCacheSize=2048m
spark.shuffle.service.enabled true
spark.shuffle.consolidateFiles true
spark.sql.parquet.binaryAsString true
spark.speculation false
spark.rpc.timeout 1000
spark.rdd.compress true
spark.core.connection.ack.wait.timeout 600
spark.driver.maxResultSize 0
spark.task.maxFailures 3
spark.shuffle.io.maxRetries 3
Я запускаю предварительно построенную версию spark-1.6.0-bin-hadoop2.6. В этом развертывании нет HDFS, все файлы Parquet хранятся на общем оборудовании (CephFS), доступном для всех компьютеров.
Я сомневаюсь, что это связано с базовой файловой системой, так как другая часть моего кода читает отличный файл Parquet как в локальном, так и в автономном режиме.
Итак, если вы вызываете, например, 'C_entries.count' сразу после чтения файла паркета' val C_entries = sqlContext.read.load (s "$ {dataBaseDir} /C.parquet") 'он также терпит неудачу? –
да, я могу. Я даже могу сделать это после того, как аварийные исполнители восстановились. Я думаю, что я помню, что читал что-то об этом, что Спарку не нужно было читать весь файл паркета, чтобы получить количество строк в фреймворке данных, поэтому нет ничего удивительного в том, что он работает. – jopasserat
Хорошо - я не знаю, что Spark может опустить выполнение действия, но это не значит, что это не так;) Я просто хотел разобраться, связана ли проблема с чтением файла паркета (возможно, коррумпированным файл или нулевые значения, где вы их не ожидаете), или если проблема связана с фактическим кодом. Поэтому, если вы считаете, что 'count' может ввести в заблуждение, тогда выполните некоторую тривиальную« карту », за которой следует, скажем,« reduceByKey », а затем подсчитайте. Это _has_ для оценки всего файла паркета при вызове 'reduceByKey'. Если этот простой пример работает, вы узнаете, что он не имеет ничего общего с паркетным файлом. –