2014-10-03 5 views
9

В искровых документах ясно, как создавать паркетные файлы с RDD ваших собственных классов случаев; (Из документации)Как преобразовать искра SchemaRDD в RDD моего класса класса?

val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example. 

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet. 
people.saveAsParquetFile("people.parquet") 

Но не ясно, как преобразовать обратно, на самом деле мы хотим, чтобы метод readParquetFile где мы можем сделать:

val people: RDD[Person] = sc.readParquestFile[Person](path) 

, где определены эти значения класса случае являются те, которые считываются методом.

+0

Любого обновленное решение по этому вопросу, так как это было первым спросил? – marios

ответ

6

Лучшее решение, которое я придумал, что требует наименьшего количества копирования и вставки новых классов выглядит следующим образом (я все еще хотел бы видеть другое решение, хотя)

Сначала вы должны определить свои случай класса, и (частично) многоразовые метод фабрики

import org.apache.spark.sql.catalyst.expressions 

case class MyClass(fooBar: Long, fred: Long) 

// Here you want to auto gen these functions using macros or something 
object Factories extends java.io.Serializable { 
    def longLong[T](fac: (Long, Long) => T)(row: expressions.Row): T = 
    fac(row(0).asInstanceOf[Long], row(1).asInstanceOf[Long]) 
} 

Некоторые котла пластина, которая уже будет доступна

import scala.reflect.runtime.universe._ 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.createSchemaRDD 

волшебное

import scala.reflect.ClassTag 
import org.apache.spark.sql.SchemaRDD 

def camelToUnderscores(name: String) = 
    "[A-Z]".r.replaceAllIn(name, "_" + _.group(0).toLowerCase()) 

def getCaseMethods[T: TypeTag]: List[String] = typeOf[T].members.sorted.collect { 
    case m: MethodSymbol if m.isCaseAccessor => m 
}.toList.map(_.toString) 

def caseClassToSQLCols[T: TypeTag]: List[String] = 
    getCaseMethods[T].map(_.split(" ")(1)).map(camelToUnderscores) 

def schemaRDDToRDD[T: TypeTag: ClassTag](schemaRDD: SchemaRDD, fac: expressions.Row => T) = { 
    val tmpName = "tmpTableName" // Maybe should use a random string 
    schemaRDD.registerAsTable(tmpName) 
    sqlContext.sql("SELECT " + caseClassToSQLCols[T].mkString(", ") + " FROM " + tmpName) 
    .map(fac) 
} 

Пример использования

val parquetFile = sqlContext.parquetFile(path) 

val normalRDD: RDD[MyClass] = 
    schemaRDDToRDD[MyClass](parquetFile, Factories.longLong[MyClass](MyClass.apply)) 

Смотрите также:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Convert-SchemaRDD-back-to-RDD-td9071.html

Хотя мне не удалось найти ни одного примера, или документацию, следуя по ссылке JIRA.

0

Существует простой способ преобразования схемы rdd в rdd с использованием pyspark в Spark 1.2.1.

sc = SparkContext() ## create SparkContext 
srdd = sqlContext.sql(sql) 
c = srdd.collect() ## convert rdd to list 
rdd = sc.parallelize(c) 

Должен быть аналогичный подход с использованием scala.

+0

Этот (сбор) будет работать для небольшого сбора данных, но если у вас много записей. – craighagerman

+0

Я думаю, что вам может не хватать точки вопроса, мы хотим, чтобы функции типа-провайдера были такими же функциональными. Поскольку python имеет довольно слабую систему типов и динамичен, я сомневаюсь, что в мире python люди действительно заботятся. Мы создаем приложения, которые должны быть очень стабильными, поэтому мы используем язык с надлежащей системой типов и нуждаемся в функциях типа-провайдера. – samthebest

+0

извините, я пропустил пункт. У меня есть небольшая загадка о вашем решении. 'QqlContext.sql (" SELECT "+ caseClassToSQLCols [T] .mkString (", ") +" FROM "+ tmpName)' возвращает объект srdd, поэтому у него нет метода карты. И метод map просто используется для применения функции к каждому элементу. Я не мог найти, что ваш код может преобразовать srdd в rdd.Не могли бы вы сказать мне, если я пропущу какую-то важную вещь в вашем коде. Спасибо! – keddie

-1

Очень грубая попытка. Очень неуверенно это будет иметь достойную производительность. Конечно, должна макро на основе альтернативных ...

import scala.reflect.runtime.universe.typeOf 
import scala.reflect.runtime.universe.MethodSymbol 
import scala.reflect.runtime.universe.NullaryMethodType 
import scala.reflect.runtime.universe.TypeRef 
import scala.reflect.runtime.universe.Type 
import scala.reflect.runtime.universe.NoType 
import scala.reflect.runtime.universe.termNames 
import scala.reflect.runtime.universe.runtimeMirror 

schemaRdd.map(row => RowToCaseClass.rowToCaseClass(row.toSeq, typeOf[X], 0)) 

object RowToCaseClass { 
    // http://dcsobral.blogspot.com/2012/08/json-serialization-with-reflection-in.html 
    def rowToCaseClass(record: Seq[_], t: Type, depth: Int): Any = { 
    val fields = t.decls.sorted.collect { 
     case m: MethodSymbol if m.isCaseAccessor => m 
    } 
    val values = fields.zipWithIndex.map { 
     case (field, i) => 
     field.typeSignature match { 
      case NullaryMethodType(sig) if sig =:= typeOf[String] => record(i).asInstanceOf[String] 
      case NullaryMethodType(sig) if sig =:= typeOf[Int] => record(i).asInstanceOf[Int] 
      case NullaryMethodType(sig) => 
      if (sig.baseType(typeOf[Seq[_]].typeSymbol) != NoType) { 
       sig match { 
       case TypeRef(_, _, args) => 
        record(i).asInstanceOf[Seq[Seq[_]]].map { 
        r => rowToCaseClass(r, args(0), depth + 1) 
        }.toSeq 
       } 
      } else { 
       sig match { 
       case TypeRef(_, u, _) => 
        rowToCaseClass(record(i).asInstanceOf[Seq[_]], sig, depth + 1) 
       } 
      } 
     } 
    }.asInstanceOf[Seq[Object]] 
    val mirror = runtimeMirror(t.getClass.getClassLoader) 
    val ctor = t.member(termNames.CONSTRUCTOR).asMethod 
    val klass = t.typeSymbol.asClass 
    val method = mirror.reflectClass(klass).reflectConstructor(ctor) 
    method.apply(values: _*) 
    } 
} 
5

простой способ обеспечить свой собственный конвертер (Row) => CaseClass. Это немного более ручной, но если вы знаете, что читаете, это должно быть довольно просто.

Вот пример:

import org.apache.spark.sql.SchemaRDD 

case class User(data: String, name: String, id: Long) 

def sparkSqlToUser(r: Row): Option[User] = { 
    r match { 
     case Row(time: String, name: String, id: Long) => Some(User(time,name, id)) 
     case _ => None 
    } 
} 

val parquetData: SchemaRDD = sqlContext.parquetFile("hdfs://localhost/user/data.parquet") 

val caseClassRdd: org.apache.spark.rdd.RDD[User] = parquetData.flatMap(sparkSqlToUser)