Мы пытаемся представить искровое задание (искра 2.0, hadoop 2.7.2), но по какой-то причине мы получаем довольно загадочный NPE в EMR. Все работает отлично, как программа scala, поэтому мы не уверены, что вызывает проблему. Вот трассировки стека:NullPointerException в Spark RDD-карта при отправке в качестве искрового задания
18: 02: 55271 ОШИБКА Utils: 91 - Aborting задачу java.lang.NullPointerException в org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.agg_doAggregateWithKeys $ (Неизвестный источник) at org.apache.spark.sql.catalyst.expressions.GeneratedClass $ GeneratedIterator.processNext (Неизвестный источник) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext (BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec $$ anonfun $ 8 $$ anon $ 1.hasNext (WholeStageCodegenExec.scala: 370) at scala.collection.Iterator $$ anon $ 12.hasNext (Iterator.scala: 438) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply $ mcV $ sp (WriterContainer.scala: 253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $ $ anonfun $ writeRows $ 1.apply (WriterContainer.scala: 252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer $$ anonfun $ writeRows $ 1.apply (WriterContainer.scala: 252) at org.apache. spark.util.Utils $ .tryWithSafeFinallyAndFailureCallbacks (Utils.scala: 1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows (WriterContainer.scala: 258) at org.apache.spark.sql.execution .datasources.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (InsertIntoHadoopFsRelationCommand.scala: 143) at org.apache.spark.sql.execution.datasource s.InsertIntoHadoopFsRelationCommand $$ anonfun $ run $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (InsertIntoHadoopFsRelationCommand.scala: 143) at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 70) at org.apache.spark.scheduler.Task.run (Task.scala: 85) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 274) at java.util.concurrent.ThreadPoolExecutor. runWorker (ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) в java.lang.Thread.run (Thread.java:745)
Насколько нам известно, это происходит по следующему методу:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.map(row =>
"text|label"
).coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Мы сузили функции отображения, как это работает, когда они представляются как искры работы:
def process(dataFrame: DataFrame, S3bucket: String) = {
dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).text(S3bucket)
}
Кто-нибудь есть какие-либо идеи, что может быть причиной этой проблемы? Также, как мы можем это решить? Мы очень в тупике.
Вы пробовали без 'coalesce()'? – gsamaras
@ gsamaras Нет! Но, похоже, он работает без объединения. Что тут происходит? – cscan