2015-06-10 1 views
4

Мне нужна помощь с вложенной структурой в SparkSQL с использованием метода sql. Я создал фрейм данных поверх существующей RDD (dataRDD) со структурой, как это:SparkSQL - вложенные структуры Row (field1, field2 = Row (..))

schema=StructType([ StructField("m",LongType()) , 
        StructField("field2", StructType([ 
        StructField("st",StringType()), 
        StructField("end",StringType()), 
        StructField("dr",IntegerType()) ])) 
        ]) 

printSchema() возвращает это:

root 
|-- m: long (nullable = true) 
|-- field2: struct (nullable = true) 
| |-- st: string (nullable = true) 
| |-- end: string (nullable = true) 
| |-- dr: integer (nullable = true) 

Создание фрейма данных из ДРРА данных и применяя схема работает хорошо.

df= sqlContext.createDataFrame(dataRDD, schema) 
df.registerTempTable("logs") 

Но получение данных не работает:

res = sqlContext.sql("SELECT m, field2.st FROM logs") # <- This fails 

...org.apache.spark.sql.AnalysisException: cannot resolve 'field.st' given input columns msisdn, field2; 

res = sqlContext.sql("SELECT m, field2[0] FROM logs") # <- Also fails 
...org.apache.spark.sql.AnalysisException: unresolved operator 'Project [field2#1[0] AS c0#2]; 

res = sqlContext.sql("SELECT m, st FROM logs") # <- Also not working 
...cannot resolve 'st' given input columns m, field2; 

Так как я могу получить доступ к вложенной структуры в синтаксисе SQL? Благодаря

ответ

3

Вы что-то еще происходит в тестировании, так как field2.st правильный синтаксис:

case class field2(st: String, end: String, dr: Int) 

val schema = StructType(
    Array(
    StructField("m",LongType), 
    StructField("field2", StructType(Array(
     StructField("st",StringType), 
     StructField("end",StringType), 
     StructField("dr",IntegerType) 
    ))) 
) 
) 

val df2 = sqlContext.createDataFrame(
    sc.parallelize(Array(Row(1,field2("this","is",1234)),Row(2,field2("a","test",5678)))), 
    schema 
) 

/* df2.printSchema 
root 
|-- m: long (nullable = true) 
|-- field2: struct (nullable = true) 
| |-- st: string (nullable = true) 
| |-- end: string (nullable = true) 
| |-- dr: integer (nullable = true) 
*/ 

val results = sqlContext.sql("select m,field2.st from df2") 

/* results.show 
m st 
1 this 
2 a 
*/ 

Оглянитесь на ваше сообщение об ошибке: cannot resolve 'field.st' given input columns msisdn, field2 - field VS. field2. Проверьте свой код еще раз - имена не выстраиваются в линию.

+0

Да, это была опечатка. Но все же field2.st выбрасывает эту ошибку: 18:19:05 WARN TaskSetManager: потерянная задача 1.0 в стадии 1.0 (TID 2, BICHDP2.TD): java.lang.ClassCastException: java.util.ArrayList нельзя передать в org. apache.spark.sql.Row at org.apache.spark.sql.catalyst.expressions.StructGetField.eval (complexTypes.scala: 93) at org.apache.spark.sql.catalyst.expressions.Alias.eval (namedExpressions .scala: 113) at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply (Projection.scala: 68) at – frengel