Лучшее решение, которое я придумал, что требует наименьшего количества копирования и вставки новых классов выглядит следующим образом (я все еще хотел бы видеть другое решение, хотя)
Сначала вы должны определить свои случай класса, и (частично) многоразовые метод фабрики
import org.apache.spark.sql.catalyst.expressions
case class MyClass(fooBar: Long, fred: Long)
// Here you want to auto gen these functions using macros or something
object Factories extends java.io.Serializable {
def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T =
fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long])
}
Некоторые котла пластина, которая уже будет доступна
import scala.reflect.runtime.universe._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
волшебное
import scala.reflect.ClassTag
import org.apache.spark.sql.SchemaRDD
def camelToUnderscores(name: String) =
"[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase())
def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}.toList.map(_.toString)
def caseClassToSQLCols[T: TypeTag]: List[String] =
getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores)
def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = {
val tmpName = "tmpTableName" // Maybe should use a random string
schemaRDD.registerAsTable(tmpName)
sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName)
.map(fac)
}
Пример использования
val parquetFile = sqlContext.parquetFile(path)
val normalRDD: RDD[MyClass] =
schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply))
Смотрите также:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html
Хотя мне не удалось найти ни одного примера, или документацию, следуя по ссылке JIRA.
Любого обновленное решение по этому вопросу, так как это было первым спросил? – marios