2016-05-17 2 views
2

Как я могу использовать «GroupBy (ключ) .agg (» с заданными пользователем функции? В частности мне нужен список всех уникальных значений одного ключа [не рассчитывайте].агрегация кадр данных Pyspark с определенным пользователем функции

+0

Насколько я знаю, UDAFs (определяемого пользователь агрегатных функций) не поддерживается by pyspark. Если вы не можете переместить свою логику в Scala, [здесь] (http://stackoverflow.com/questions/33233737/), это может помочь. –

ответ

1

Я нашел pyspark.sql.functions.collect_set(col), который делает работу я хотел.

+0

как вы его использовали? не могли бы вы привести пример, пожалуйста? –

0

collect_set и collect_list (для маркированных и нумерованных результатов соответственно) может быть использован для результатов постобработки GroupBy. Начиная с простым искровым dataframe

df = sqlContext.createDataFrame(
[('first-neuron', 1, [0.0, 1.0, 2.0]), 
('first-neuron', 2, [1.0, 2.0, 3.0, 4.0])], 
("neuron_id", "time", "V")) 

Скажем, цель состоит в том, чтобы урна самой длинной длиной списка V для каждого нейрона (сгруппированной по имени)

from pyspark.sql import functions as F 
grouped_df = tile_img_df.groupby('neuron_id').agg(F.collect_list('V')) 

Мы теперь сгруппированы списки V в список списков. Поскольку мы хотели наибольшую длину мы можем запустить

import pyspark.sql.types as sq_types 
len_udf = F.udf(lambda v_list: int(np.max([len(v) in v_list])), 
        returnType = sq_types.IntegerType()) 

max_len_df = grouped_df.withColumn('max_len',len_udf('collect_list(V)')) 

Чтобы получить max_len колонки с добавленной максимальной длиной списка V