2

У меня есть данные в кадре данных Спарк, с колонной col, который содержит структурированные данные формы:Эффективный способ применения определенной функции для структурированного столбца в кадре данных Spark?

------ col ------- # Column whose elements are structures 
field0 field1 …  # StructType with StructFields (variable names and count) 
[1,2,3] [4,5] [6] # Each field is of type ArrayType 
[1,2] [3] [] 
… 

, где число и имена полей не являются фиксированными.

Каков наиболее эффективный способ вычисления общего количества элементов в каждой строке? В приведенном выше примере, ожидаемый в результате кадр данных, таким образом, будет:

num_elements 
6 
3 
… 

Существует всегда решение определенной пользователем функции:

from pyspark.sql.types import IntegerType 

def num_elements(all_arrays_in_row): 
    return sum(map(len, all_arrays_in_row)) 
num_elements = pyspark.sql.functions.udf(num_elements, IntegerType()) 

data_frame.select(num_elements(data_frame.col)).show() # Number of elements in each row 

Теперь, я не уверен, является ли это вообще эффективный, поскольку:

  1. Функция num_elements() находится в Python.
  2. Если поля по какой-то причине не хранятся вместе, то заставляет выборку каждого массива перед вычислением их длины.

В целом, «чистый» подход Spark был бы более эффективным, но он ускользает от меня. То, что я пытался до сих пор состоит в следующем, но это способ более громоздким, чем выше подхода, а также не является полным:

  1. Получить имена полей field0 и т.д. с [field.name for field in data_frame.select("col").schema.fields[0].dataType.fields] (громоздким).
  2. Для каждого имени поля, эффективно рассчитать размер своего массива:

    sizes_one_field = data_frame.select(pyspark.sql.functions.size(
                data_frame.col.getField(field_name)) 
    

Теперь я застрял в этот момент, потому что я не знаю, как суммируются кадры данных 1-столбцов sizes_one_field (имеется по одному для каждого имени поля). Плюс, может быть, есть способ прямого применения функции size() к каждому полю столбца col в Spark (через какую-то карту?)? Или какой-то совершенно другой подход к получению общего количества элементов в каждой строке?

+0

Вы могли бы поделиться примерными данными и ожидаемым выходом? – mtoto

+0

Хорошая идея. Готово. – EOL

ответ

1

Вы можете попробовать что-то вроде следующего:

from pyspark.sql import functions as f 

result = df.select(sum((f.size(df[col_name]) for col_name in df.columns), f.lit(0))) 

Это решение использует pyspark.sql встроенные функции и будет выполняться оптимизированным способом. Для получения дополнительной информации об этих функциях вы можете проверить его pyspark documentation.

+0

Это очень похоже на хорошее решение, спасибо. Отсутствующая деталь - это массивы не в столбцах, а внутри структуры (уникальный столбец 'col' содержит структуры). Я обновил вопрос, чтобы сделать это очень явным, если это может помочь. Каким будет чистый способ доступа к каждой области? Всегда существует 'df.col.getField (field_name)', но если поля могут быть перекрещены непосредственно, не получив сначала имена полей (что громоздко, насколько я знаю), это будет еще лучше. – EOL

+0

Я бы использовал встроенную функцию sum() 'для суммирования столбцов:' result = df.select (sum ((f.size (df [col_name]) для col_name в df.columns), f.lit (0))) '. Это использует тот факт, что второй аргумент sum() ''- это« нуль »суммы. (За исключением этого, опять же, у меня нет прямых столбцов, но полей в одном столбце.) – EOL

+0

@EOL Ницца, я обновляю свой ответ с помощью функции sum. Что касается лучшего решения, в pyspark я считаю, что вам нужно использовать либо getField, как вы сказали, либо вариант udf (который достаточно эффективен, не беспокойтесь). В Scala вы можете работать с синтаксисом Dataset [Map [String, Array]] и использовать простую карту, чтобы получить желаемый результат (что-то вроде 'ds.map (_. FlatMap (_._ 2) .size) '). –

 Смежные вопросы

  • Нет связанных вопросов^_^