Я пытаюсь выполнить некоторую передачу сообщений на графике для вычисления рекурсивных функций. Я получаю сообщение об ошибке, когда я определяю график, вершинами которого являются вывод aggregateMessages
. Код для контекстаGraphX VertexRDD NullPointerException
> val newGraph = Graph(newVertices, edges)
newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected]
//This is the RDD that causes the problem
> val result = newGraph.aggregateMessages[List[Double]](
{triplet => triplet.sendToDst(triplet.srcAttr)},
{(a,b) => a.zip(b).map { case (x, y) => x + y }},
{TripletFields.Src})
result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57
> result.take(1)
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0)))
до сих пор без проблем, но когда я пытаюсь
> val newGraph2 = Graph(result, edges)
newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected]
> val result2 = newGraph2.aggregateMessages[List[Double]](
{triplet => triplet.sendToDst(triplet.srcAttr)},
{(a,b) => a.zip(b).map { case (x, y) => x + y }},
{TripletFields.Src})
> result2.count
я получаю следующее (обрезается) Ошибка:
result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException
at $anonfun$2.apply(<console>:62)
at $anonfun$2.apply(<console>:62)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
at $anonfun$1.apply(<console>:61)
at $anonfun$1.apply(<console>:61)
at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
...
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
...
Caused by: java.lang.NullPointerException
at $anonfun$2.apply(<console>:62)
at $anonfun$2.apply(<console>:62)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536)
at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531)
at $anonfun$1.apply(<console>:61)
at $anonfun$1.apply(<console>:61)
at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
... 3 more
Я не думаю, что это тип ошибки несоответствия, потому что aggregateMessages
возвращает VertexRDD
, любые идеи, почему я получаю эту проблему?