7

a Spark DataFrame содержит столбец типа Array [Double]. Он генерирует исключение ClassCastException, когда я пытаюсь вернуть его в функцию map(). Следующий код scala генерирует исключение.Столбец Access Array в Spark

case class Dummy(x:Array[Double]) 
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3)))) 
val s = df.map(r => { 
    val arr:Array[Double] = r.getAs[Array[Double]]("x") 
    arr.sum 
}) 
s.foreach(println) 

Исключение составляет

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Cam кто-нибудь объяснить мне, почему он не работает? что я должен делать вместо этого? Я использую Спарк 1.5.1 и 2.10.6

Скала

Благодарности

ответ

19

ArrayType представлена ​​в как scala.collection.mutable.WrappedArray. Вы можете извлечь его, используя, например

val arr: Seq[Double] = r.getAs[Seq[Double]]("x") 

или

val i: Int = ??? 
val arr = r.getSeq[Double](i) 

или даже:

import scala.collection.mutable.WrappedArray 

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x") 

Если DataFrame относительно тонкая, то сопоставление с образцом может быть лучшим подходом:

import org.apache.spark.sql.Row 

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)} 

, хотя вы должны иметь в виду, что тип последовательности не установлен.

В Спарк> = 1,6 можно также использовать Dataset следующим образом:

df.select("x").as[Seq[Double]].rdd