2015-06-08 4 views
1

Я пытаюсь прочитать таблицу cassandra, используя драйвер cassandra для искры. Вот код.Задача не вызвала сериализуемого результата в искры

val x = 1 to 2 
val rdd = sc.parallelize(x) 

val query = "Select data from testkeyspace.testtable where id=%d" 

val cc = CassandraConnector(sc.getConf) 

val res1 = 
    rdd.map{ it => 
      cc.withSessionDo{ session => 
      session.execute(query.format(it)) 
     } 
    } 

res1.take(1).foreach(println) 

, но я получаю исключение. Задача не имела сериализуемого результата.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1.0 in stage 24.0 (TID 77) had a not serializable result: com.datastax.driver.core.ArrayBackedResultSet$SinglePage 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Как решить эту проблему?

ответ

1

Объектом, не связанным с сериализацией в нашем преобразовании, является результат, возвращаемый из Cassandra, который является итерируемым по результату запроса. Обычно вы хотите материализовать эту коллекцию в RDD.

Один из способов будет спрашивать все записи, вытекающие из этого запроса:

session.execute(query.format(it)).all() 
+0

даже если я все, что я получаю то же самое исключение. «задача имела не сериализуемый результат» – Knight71

+0

@ Knight71 с той же причиной? – maasg

+0

На этот раз с различным типом данных. 'org.apache.spark.scheduler.TaskSetManager: Задача 1.0 в стадии 1.0 (TID 2) не имела сериализуемого результата: com.datastax.driver.core.ArrayBackedRow; не повторять .org.apache.spark.SparkException: Иск прерывается из-за срыва этапа: Задача 1.0 на этапе 1.0 (TID 2) не имела сериализуемого результата: com.datastax.driver.core.ArrayBackedRow' – Knight71