2017-02-19 68 views
-1

У меня есть данные с 4 столбцами (c1, c2, c3 и c4) и есть в RDD через некоторый код scala.Spark - группа на одной колонке и находить среднее значение других колонок

Я хочу группировать/bin с помощью c1 и находить среднее значение c2 и среднее значение c3, а также c4 в каждой из групп c1.

Я смотрю на RDD: reduceByKey, но мне не удалось точно понять, как его использовать. Есть лучший способ сделать это? Как я могу это сделать из Scala API?

ответ

2

Вы говорите, что у вас есть DataFrame, так что вы, вероятно, не следует использовать RDD API (который часто является менее эффективным, и в этом случае, вероятно, менее интуитивной либо) - вот решение с использованием DataFrame API:

import org.apache.spark.sql.functions._ 

val result = df.groupBy("c1").agg(mean("c2"), mean("c3"), mean("c4")) 

result будет DataFrame со следующей схемой (при условии, c1 является строка для начала):

root 
|-- c1: string (nullable = true) 
|-- avg(c2): double (nullable = true) 
|-- avg(c3): double (nullable = true) 
|-- avg(c4): double (nullable = true) 

EDIT:

в случае список столбцов является динамическим, вы можете легко отобразить такой список в список соответствующих «означает» и агрегировать DF, используя этот список:

val colsToCompute = List("c2", "c3", "c4") // can be loaded dynamically 
val means: Seq[Column] = colsToCompute.map(mean) 
val result = df.groupBy("c1").agg(means.head, means.tail: _*) 

Для полноты - вот решение с использованием RDD API, но:

  • Это гораздо менее кратким
  • Это гораздо сложнее «generify» для динамического ряда колонн
  • Это может выполнять хуже

Там может быть немного короче реализации, но не намного проще:

val rdd: RDD[(String, Int, Int, Int)] = ... 

val result: RDD[(String, (Double, Double, Double))] = rdd 
    .keyBy(_._1) 
    .mapValues { case (k, v1, v2, v3) => (1, v1, v2, v3) } // add base for counter 
    .reduceByKey { case ((a1, a2, a3, a4), (b1, b2, b3, b4)) => (a1+b1, a2+b2, a3+b3, a4+b4) } // sum counter and values 
    .mapValues { case (count, v1, v2, v3) => (v1.toDouble/count, v2.toDouble/count, v3.toDouble/count) } // calculate means 
+0

Хотя у меня есть Dataframe прямо сейчас, я смотрю на это делать через РДУ, потому что наконец, данные будут динамическими. Он может иметь 4 столбца или 3 столбца; Программа определит, сколько столбцов будет означать, просмотрев конфигурационный файл. –

+0

«наконец, данные будут динамическими. У него может быть 4 столбца или 3 столбца» - это тоже проще сделать с DataFrame ... Я буду обновлять свой ответ соответственно –