2017-02-22 15 views
0

Привет у меня есть кадр данных искры, которая печатает как этот (один ряд)Как Свести искра dataframe Row нескольких Dataframe Рядов

[abc,WrappedArray(11918,1233),WrappedArray(46734,1234),1487530800317] 

Так внутри строк я завернутый массив, я хочу, чтобы сгладить это и создать dataframe, который имеет одно значение для каждого массива, например, выше строки должны преобразовать что-то вроде этого

[abc,11918,46734,1487530800317] 
[abc,1233,1234,1487530800317] 

Так я получил dataframe с 2 строк вместо 1, Таким образом, каждый соответствующий элемент из обернутого массива должны идти в новой строке.

Edit 1 после 1-го ответа: Что делать, если у меня есть 3 массивы в моем входе

WrappedArray(46734,1234,[abc,WrappedArray(11918,1233),WrappedArray(46734,1234),WrappedArray(1,2),1487530800317] 

мой вывод должен быть

[abc,11918,46734,1,1487530800317] 
[abc,1233,1234,2,1487530800317] 

ответ

0

Определенно не лучшее решение, но это будет работать :

case class TestFormat(a: String, b: Seq[String], c: Seq[String], d: String) 

val data = Seq(TestFormat("abc", Seq("11918","1233"), 
      Seq("46734","1234"), "1487530800317")).toDS 

val zipThem: (Seq[String], Seq[String]) => Seq[(String, String)] = _.zip(_) 

val udfZip = udf(zipThem) 

data.select($"a", explode(udfZip($"b", $"c")) as "tmp", $"d") 
    .select($"a", $"tmp._1" as "b", $"tmp._2" as "c", $"d") 
    .show 

Проблема в том, что по умолчанию вы не можете быть уверены, что обе последовательности одинаковой длины.

Вероятно, лучшим решением было бы переформатировать весь кадр данных в структуру, которая моделирует данные, например.

root 
-- a 
-- d 
-- records 
---- b 
---- c 
+0

Спасибо за ответ, я если у меня есть 3 массива [abc, WrappedArray (11918,1233), WrappedArray (46734,1234), WrappedArray (1,2), 1487530800317] –

+0

, если вы в порядке с ответом, вы можете проголосовать вместо O Благодарю! –

0

Спасибо за ответ @swebbo, вы ответите помогли мне получить это сделано:

Я сделал это:

import org.apache.spark.sql.functions.{explode, udf} 
import sqlContext.implicits._ 
val zipColumns = udf((x: Seq[Long], y: Seq[Long], z: Seq[Long]) => (x.zip(y).zip(z)) map { 
    case ((a,b),c) => (a,b,c) 
}) 

val flattened = subDf.withColumn("columns", explode(zipColumns($"col3", $"col4", $"col5"))).select(
    $"col1", $"col2", 
    $"columns._1".alias("col3"), $"columns._2".alias("col4"), $"columns._3".alias("col5")) 
flattened.show 

Надеется, что понятно :)

 Смежные вопросы

  • Нет связанных вопросов^_^