Я пытаюсь прочитать таблицу cassandra, используя драйвер cassandra для искры. Вот код.Задача не вызвала сериализуемого результата в искры
val x = 1 to 2
val rdd = sc.parallelize(x)
val query = "Select data from testkeyspace.testtable where id=%d"
val cc = CassandraConnector(sc.getConf)
val res1 =
rdd.map{ it =>
cc.withSessionDo{ session =>
session.execute(query.format(it))
}
}
res1.take(1).foreach(println)
, но я получаю исключение. Задача не имела сериализуемого результата.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 24.0 (TID 77) had a not serializable result: com.datastax.driver.core.ArrayBackedResultSet$SinglePage
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Как решить эту проблему?
даже если я все, что я получаю то же самое исключение. «задача имела не сериализуемый результат» – Knight71
@ Knight71 с той же причиной? – maasg
На этот раз с различным типом данных. 'org.apache.spark.scheduler.TaskSetManager: Задача 1.0 в стадии 1.0 (TID 2) не имела сериализуемого результата: com.datastax.driver.core.ArrayBackedRow; не повторять .org.apache.spark.SparkException: Иск прерывается из-за срыва этапа: Задача 1.0 на этапе 1.0 (TID 2) не имела сериализуемого результата: com.datastax.driver.core.ArrayBackedRow' – Knight71