2017-02-04 7 views
0

Каков наилучший способ добавить новый столбец и новые строки в DataFrame?
Возможно ли это сделать одновременно?Spark DataFrame add Column with Rows

К примеру, у меня есть таблица AB, как:

+------+-------+ 
|  a|  b| 
+------+-------+ 
| true| true|  
| true| false| 
+---+---+------+ 

Теперь я хотел бы добавить новый столбец «с» к АВ и новых строк, но только если условие выполнено. Это условие должно применяться для каждой строки в AB, содержащей c = false и c = true.

Пусть foo(row): Boolean быть условием и:

foo(Row(true, true, false)) = true 
foo(Row(true, true, true)) = true 
foo(Row(true, false, false)) = true 
foo(Row(true, false, false)) = false 

Таким образом, новая таблица ABC должна выглядеть так:

+------+-------+-------+ 
    |  a|  b|  c| 
    +------+-------+-------+ 
    | true| true| true|  
    | true| true| false|  
    | true| false| false| 
    +------+-------+-------+ 

Я попытался Crossjoin и фильтр:

val rows = List(Row(true), Row(false)) 

val C = spark.createDataFrame(
    spark.sparkContext.parallelize(rows), 
    StructType(List(StructField("c", BooleanType))) 
) 

val ABC = AB.join(C).filter(r => foo(row)) 

Производительность очень плохой (не могли бы вы мне объяснить почему?). Я также пробовал использовать flatMap:

 val encoder = RowEncoder(AB.schema.add(StructField("c", BooleanType))) 

     val ABC = AB.flatMap { row => 
     Seq(Row.fromSeq(row.toSeq :+ true), Row.fromSeq(row.toSeq :+ false)).filter(r => foo(r)) 
     }(encoder) 

Производительность также плохая. Для литья больших столов требуется слишком много времени. Как я заметил, кастинг применяется на мастере. Для больших таблиц (миллион строк) это плохо.

Есть ли у вас некоторые другие и более эффективные решения этой проблемы?

Btw, я использую Apache Spark 2.0.1 с Scala.

+0

Я честно не понимаю, о чем вы просите. И для записи кросс-соединения всегда имеют плохую производительность, если вы не используете методы хэширования, такие как LSH. – eliasah

+0

Я хотел бы расширить логическую таблицу с новым столбцом и новыми строками. моя старая таблица может содержать 2^n строк и новую таблицу 2^(n + 1) строк (n = | столбцы |). При больших n существует много строк. Поэтому я хотел бы отфильтровать некоторые строки с помощью функции «foo». –

ответ

1

Я думаю, что вы сделали это гораздо сложнее, чем это должно быть, от того, что я понимаю, следующий должен дать результат вы после

val stuff = List[Row](Row(true, true),Row(true, false),Row(false, true), Row(false, false)) 
val rows = sc.parallelize(stuff) 
val schema = StructType(StructField("a", BooleanType, true) :: StructField("b", BooleanType, true) :: Nil) 
val frame = spark.createDataFrame(rows, schema).withColumn("c", col("a")&&(col("b"))) 

тогда, если вы делаете frame.show он должен показать

+-----+-----+-----+ 
| a| b| c| 
+-----+-----+-----+ 
| true| true| true| 
| true|false|false| 
|false| true|false| 
|false|false|false| 
+-----+-----+-----+