Как я могу использовать «GroupBy (ключ) .agg (» с заданными пользователем функции? В частности мне нужен список всех уникальных значений одного ключа [не рассчитывайте].агрегация кадр данных Pyspark с определенным пользователем функции
ответ
Я нашел pyspark.sql.functions.collect_set(col)
, который делает работу я хотел.
как вы его использовали? не могли бы вы привести пример, пожалуйста? –
collect_set и collect_list (для маркированных и нумерованных результатов соответственно) может быть использован для результатов постобработки GroupBy. Начиная с простым искровым dataframe
df = sqlContext.createDataFrame(
[('first-neuron', 1, [0.0, 1.0, 2.0]),
('first-neuron', 2, [1.0, 2.0, 3.0, 4.0])],
("neuron_id", "time", "V"))
Скажем, цель состоит в том, чтобы урна самой длинной длиной списка V для каждого нейрона (сгруппированной по имени)
from pyspark.sql import functions as F
grouped_df = tile_img_df.groupby('neuron_id').agg(F.collect_list('V'))
Мы теперь сгруппированы списки V в список списков. Поскольку мы хотели наибольшую длину мы можем запустить
import pyspark.sql.types as sq_types
len_udf = F.udf(lambda v_list: int(np.max([len(v) in v_list])),
returnType = sq_types.IntegerType())
max_len_df = grouped_df.withColumn('max_len',len_udf('collect_list(V)'))
Чтобы получить max_len колонки с добавленной максимальной длиной списка V
Насколько я знаю, UDAFs (определяемого пользователь агрегатных функций) не поддерживается by pyspark. Если вы не можете переместить свою логику в Scala, [здесь] (http://stackoverflow.com/questions/33233737/), это может помочь. –