2016-09-11 3 views
3

Я пытаюсь сгладить схему существующего фрейма данных с вложенными полями. Структура моей dataframe что-то вроде этого:Explode array in apache spark Data Frame

root 
|-- Id: long (nullable = true) 
|-- Type: string (nullable = true) 
|-- Uri: string (nullable = true)  
|-- Type: array (nullable = true) 
| |-- element: string (containsNull = true) 
|-- Gender: array (nullable = true) 
| |-- element: string (containsNull = true) 

Тип и пол может содержать массив элементов, один элемент или нулевое значение. Я пытался использовать следующий код:

var resDf = df.withColumn("FlatType", explode(df("Type"))) 

Но в результате в результате кадра данных, я освобождаю строки, для которых я имел нулевые значения для типа столбца. Это означает, что, например, если у меня есть 10 строк и в 7 строках, тип имеет значение null и в 3-х типах не является нулевым, после того, как я использую explode в результирующем фрейме данных, у меня есть только три строки.

Как сохранить строки с нулевыми значениями, но взорвать массив значений?

Я нашел какое-то обходное решение, но все еще застрял в одном месте. Для стандартных типов мы можем сделать следующее:

def customExplode(df: DataFrame, field: String, colType: String): org.apache.spark.sql.Column = { 
var exploded = None: Option[org.apache.spark.sql.Column] 
colType.toLowerCase() match { 
    case "string" => 
    val avoidNull = udf((column: Seq[String]) => 
    if (column == null) Seq[String](null) 
    else column) 
    exploded = Some(explode(avoidNull(df(field)))) 
    case "boolean" => 
    val avoidNull = udf((xs: Seq[Boolean]) => 
    if (xs == null) Seq[Boolean]() 
    else xs) 
    exploded = Some(explode(avoidNull(df(field)))) 
    case _ => exploded = Some(explode(df(field))) 
} 
exploded.get 

}

И после этого просто использовать его как этот:

val explodedField = customExplode(resultDf, fieldName, fieldTypeMap(field)) 
resultDf = resultDf.withColumn(newName, explodedField) 

Однако, у меня есть проблема для типа структуры для следующего типа конструкции:

|-- Address: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- AddressType: array (nullable = true) 
| | | |-- element: string (containsNull = true) 
| | |-- DEA: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- Number: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 
| | | | |-- ExpirationDate: array (nullable = true) 
| | | | | |-- element: timestamp (containsNull = true) 
| | | | |-- Status: array (nullable = true) 
| | | | | |-- element: string (containsNull = true) 

Как мы можем обработать такую ​​схему, когда DEA равно null?

Заранее спасибо.

P.S. Я попытался использовать Lateral views, но результат тот же.

ответ

1

Может быть, вы можете попробовать использовать when:

val resDf = df.withColumn("FlatType", when(df("Type").isNotNull, explode(df("Type"))) 

Как показано в when функции documentation-х, значение null вставляется для значений, которые не соответствуют условиям.

+0

Извините, но когда я попробую это решение, у меня есть следующее исключение: java.lang.UnsupportedOperationException. Если я заменил explode (df («Тип»)) только с некоторым значением - он отлично работает. Я полагаю, когда функция не поддерживает взорванный столбец как значение – Artem

+0

@Artem, вы правы, извините. Является ли «союз» опцией для вас? Вы можете сделать 'df.where ($« Тип ».isNull) .withColumn (« FlatType », lit (null)). UnionAll (df.withColumn (« FlatType », explode ($« Тип »)))' –

+0

yes , спасибо, я думал об этом варианте, но я создаю общий алгоритм для выравнивания схемы, и я боюсь, что союз может быть очень медленным. Я надеюсь найти лучшее решение, но объединение - это резервный вариант для меня. – Artem