2017-02-07 12 views
1

У меня есть следующий Спарк DataFrame:Как получить максимум ArrayType MapTypes с помощью Spark SQL?

df = sql.createDataFrame([ 
     (1, [ 
       {'name': 'john', 'score': '0.8'}, 
       {'name': 'johnson', 'score': '0.9'}, 
      ]), 
     (2, [ 
       {'name': 'jane', 'score': '0.9'}, 
       {'name': 'janine', 'score': '0.4'}, 
      ]), 
     (3, [ 
       {'name': 'sarah', 'score': '0.2'}, 
       {'name': 'sara', 'score': '0.9'}, 
      ]), 
    ], schema=['id', 'names']) 

Спарк правильно выводит схему:

root 
|-- id: long (nullable = true) 
|-- names: array (nullable = true) 
| |-- element: map (containsNull = true) 
| | |-- key: string 
| | |-- value: string (valueContainsNull = true) 

Для каждой строки, я хочу, чтобы выбрать имя с наибольшим количеством очков. Я могу сделать это с помощью Python UDF следующим образом:

import pyspark.sql.types as T 
import pyspark.sql.functions as F 

def top_name(names): 
    return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name'] 

top_name_udf = F.udf(top_name, T.StringType()) 

df.withColumn('top_name', top_name_udf('names')) \ 
    .select('id', 'top_name') \ 
    .show(truncate=False) 

По желанию, вы получите:

+---+--------+ 
|id |top_name| 
+---+--------+ 
|1 |johnson | 
|2 |jane | 
|3 |sara | 
+---+--------+ 

Как я могу это сделать с помощью искровой SQL? Возможно ли это сделать без Python UDF, чтобы данные не были сериализованы между Python и Java?


К сожалению, я бегу Спарк 1.5 и не может использовать registerJavaFunction в Спарк 2.1.

ответ

2

Используйте метод sqlContext.registerFunction, чтобы зарегистрировать функцию (не udf) в sql. Также зарегистрируйте свой df как таблицу sql.

sqlContext.registerDataFrameAsTable(df, "names_df") 

sqlContext.registerFunction("top_name", top_name,T.StringType()) 

sqlContext.sql("SELECT top_name(names) as top_name from names_df").collect() 

> [Row(top_name=u'johnson'), Row(top_name=u'jane'), Row(top_name=u'sara')] 
+0

Спасибо, это работает для меня. Однако возможно ли это сделать _without_ UDF Python и вместо этого использовать чистый SQL? Я бы хотел избежать сериализации между Python и Java. К сожалению, я нахожусь на Spark 1.5 и не имею доступа к ['registerJavaFunction'] (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava# pyspark.sql.SQLContext.registerJavaFunction) для регистрации Scala/Java UDF. –

+0

Можете ли вы использовать обычный sql? 'sqlcontext.sql (" SELECT FIRST (name) as top_names FROM df GROUP BY score ORDER BY score DESC; ")' –

+0

К сожалению, не работает. Только столбцы таблицы доступны для группы. Я не думаю, что это возможно сделать, по крайней мере, в этой версии Spark. –