У меня есть следующий Спарк DataFrame:Как получить максимум ArrayType MapTypes с помощью Spark SQL?
df = sql.createDataFrame([
(1, [
{'name': 'john', 'score': '0.8'},
{'name': 'johnson', 'score': '0.9'},
]),
(2, [
{'name': 'jane', 'score': '0.9'},
{'name': 'janine', 'score': '0.4'},
]),
(3, [
{'name': 'sarah', 'score': '0.2'},
{'name': 'sara', 'score': '0.9'},
]),
], schema=['id', 'names'])
Спарк правильно выводит схему:
root
|-- id: long (nullable = true)
|-- names: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
Для каждой строки, я хочу, чтобы выбрать имя с наибольшим количеством очков. Я могу сделать это с помощью Python UDF следующим образом:
import pyspark.sql.types as T
import pyspark.sql.functions as F
def top_name(names):
return sorted(names, key=lambda d: d['score'], reverse=True)[0]['name']
top_name_udf = F.udf(top_name, T.StringType())
df.withColumn('top_name', top_name_udf('names')) \
.select('id', 'top_name') \
.show(truncate=False)
По желанию, вы получите:
+---+--------+
|id |top_name|
+---+--------+
|1 |johnson |
|2 |jane |
|3 |sara |
+---+--------+
Как я могу это сделать с помощью искровой SQL? Возможно ли это сделать без Python UDF, чтобы данные не были сериализованы между Python и Java?
К сожалению, я бегу Спарк 1.5 и не может использовать registerJavaFunction
в Спарк 2.1.
Спасибо, это работает для меня. Однако возможно ли это сделать _without_ UDF Python и вместо этого использовать чистый SQL? Я бы хотел избежать сериализации между Python и Java. К сожалению, я нахожусь на Spark 1.5 и не имею доступа к ['registerJavaFunction'] (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=registerjava# pyspark.sql.SQLContext.registerJavaFunction) для регистрации Scala/Java UDF. –
Можете ли вы использовать обычный sql? 'sqlcontext.sql (" SELECT FIRST (name) as top_names FROM df GROUP BY score ORDER BY score DESC; ")' –
К сожалению, не работает. Только столбцы таблицы доступны для группы. Я не думаю, что это возможно сделать, по крайней мере, в этой версии Spark. –