2017-02-20 41 views
0

Так что у меня искры RDD, который содержит значение следующего видаСпарк combineByKey на значениях, которые содержат кортежи

RDD[(Key:Int, (val1:Double, val2:Double))] 

Для примера

(1,(1,1)) 
(1,(1,2)) 
(1,(1,3)) 
(1,(1,4)) 
(1,(1,5)) 
(2,(1,1)) 
(2,(1,2)) 
(2,(1,3)) 
(2,(1,4)) 
(2,(1,5)) 

Где Int является ключом, и кортеж содержит два двухместный номер Значения

Я хочу применить операцию combByKey, где для каждой клавиши мы выполняем следующую операцию:

val2/val1 

Я в основном хочу найти разделение этих двух значений для каждого вхождения ключа, а затем найти их среднее значение для заданного ключа. Поэтому для createCombiner мы разделим значения и создадим счетчик. Для mergeValue я хочу разделить эти значения для данного ключа, а затем суммировать их с предыдущим значением и увеличивать счетчик.

И, наконец, для mergeCombiner, Суммировать значения сумматоров и разделить их на общее число значений (разделение возможно сделать в отдельную карту?)

Моя проблема, я не могу найти любой пример, когда они запускают combByKey в паре, где значения представляют собой кортеж вместо одного значения Integer.

Я пытался писать следующий код

arr2.combineByKey((v) => (v._2/v._1, 1),\\ Creating Combiner 
(acc: (Double, Int), q:(Double,Double)) => ((q._2/q._1)+acc._1,acc._2+1), \\Merging Values 
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) \\Merging Combiners 

Но это дает мне следующие ошибки для значения слияния части

type mismatch; found : ((Double, Int), (Double, Double)) => (Double, Int) required: (?, (Int, Int)) => ? 

Может кто-нибудь помочь мне здесь и помочь мне понять, что я делать неправильно. Как получить доступ к значениям кортежа и разделить их, а затем добавить их к предыдущим значениям?

Любая помощь будет высоко оценен

ответ

1

Тип объявлен q в параметре слияния является виновником, плюс вам нужно преобразовать к двойному до разделения, чтобы получить правильные значения

arr2.combineByKey((v) => (v._2.toDouble/v._1, 1), 
(acc: (Double, Int), q:(Int,Int)) => ((q._2/q._1)+acc._1,acc._2+1), 
(acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) 
+0

Это кажется чтобы работать нормально, если val1 и val2 являются целыми, но, как указано в вопросе, они являются Double (Мой пример, хотя и имеет их как целочисленные, поэтому я думаю, что именно здесь возникла путаница). В любом случае следующее утверждение решает проблему Благодарим за помощь :) 'combByKey ((v) => (v._2.toDouble/v._1, 1), (acc: (Double, Int), q : ((Двойной, Двойной))) => ((q._2/q._1) + acc._1, acc._2 + 1), (acc1: (Double, Int), acc2: (Double, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) ' –