2017-01-13 8 views
0

Мне нужно получить все столбцы вместе с count.In Scala RDD.Scala RDD groupby count вместе со всеми столбцами

Col1 col2 col3 col4 
us A  Q1 10 
us A  Q3 10 
us A  Q2 20 
us B  Q4 10 
us B  Q5 20 
uk A  Q1 10 
uk A  Q3 10 
uk A  Q2 20 
uk B  Q4 10 
uk B  Q5 20 

Я хочу привести как:

Col1 col2  col3  col4  count 
us   A   Q1  10   3 
us   A   Q3  10   3 
us   A   Q3  10   3 
us   B   Q4  10   2 
us   B   Q5  20   2 
uk   A   Q1  10   3 
uk   A   Q3  10   3 
uk   A   Q3  10   3 
uk   B   Q4  10   2 
uk   B   Q5  20   2 

Это что-то вроде группы по col1 из, col2 и получает отсчеты. Теперь мне нужно вместе с col13, col4.

Я пробуя SCALA RDD нравится:

val Top_RDD_1 = RDD.groupBy(f=> (f._1,f._2)).mapValues(_.toList) 

Это производит

RDD[((String, String), List[(String, String, String, Double, Double, Double)])] 

Ничего, кроме (col1, col2), список (col1, col2, col3, col14) результат, как (мы, A) Список ((us, a, Q1,10), (us, a, Q3,10), (us, a, Q2,20)). ,,,

Как я могу взять список подсчитывать и получать доступ к значению списка.

Пожалуйста, помогите мне исправить код SCALA RDD.

Thanks Balaji.

ответ

1

Я не вижу способ сделать это в одном «сканировании» RDD - вам нужно будет вычислить подсчеты с помощью reduceByKey, а затем join до исходного RDD. Для того, чтобы эффективно сделать это (не вызывая повторное вычисление входа), вы бы лучше cache/persist входа до присоединения:

val keyed: RDD[((String, String), (String, String, String, Int))] = input 
    .keyBy { case (c1, c2, _, _) => (c1, c2) } 
    .cache() 

val counts: RDD[((String, String), Int)] = keyed.mapValues(_ => 1).reduceByKey(_ + _) 

val result = keyed.join(counts).values.map { 
    case ((c1, c2, c3, c4), count) => (c1, c2, c3, c4, count) 
}