2

Это следующий вопрос от here. Я пытаюсь реализовать k-средство на основе этого implementation. Он отлично работает, , но Я хотел бы заменить groupByKey() на reduceByKey(), но я не уверен, как (я не беспокоюсь о производительности сейчас). Вот соответствующий уменьшенная код:Заменить группуByKey() с помощью reduceByKey()

val data = sc.textFile("dense.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache() 

val read_mean_centroids = sc.textFile("centroids.txt").map(
     t => (t.split("#")(0), parseVector(t.split("#")(1)))) 
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2) 
do { 
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2)) 
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :) 
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap() 
    .. 

Обратите внимание, что println(newCentroids) даст:

Карта (23 -> (-6.269305E-4, -0,0011746404, -4.08004E-5), 8 - > (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0,0016383086, -0,0016974678, 1,45 ..

и println(closest):

MapPartitionsRDD [6] на карте в kmeans.scala: 75

Соответствующий вопрос: Using reduceByKey in Apache Spark (Scala).


Некоторые documentation:

Защиту reduceByKey (FUNC: (V, V) ⇒ V): РДД [(K, V)]

Merge значения для каждого ключа используя ассоциативную функцию уменьшения.

Защиту reduceByKey (FUNC: (V, V) ⇒ V, numPartitions: Int): РДД [(K, V)]

Merge значения для каждого ключа с использованием ассоциативной снижения функции.

Защиту reduceByKey (разметки: Разметка, FUNC: (V, V) ⇒ V): РДД [(K, V)]

Merge значения для каждого ключа с помощью ассоциативной уменьшения функции.

Защиту groupByKey(): РДД [(К, Iterable [V])]

Группа значения для каждого ключа в RDD в одной последовательности.

ответ

5

Вы можете использовать aggregateByKey() (немного более естественным, чем reduceByKey()), как это вычислить newCentroids:

val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
    (agg, v) => (agg._1 += v, agg._2 + 1L), 
    (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2) 
).mapValues(agg => agg._1/agg._2).collectAsMap 

Для этой работы вам необходимо будет вычислить размерность данных, т.е. dim, но вам нужно только сделать это один раз. Вероятно, вы могли бы использовать что-то вроде val dim = data.first._2.length.

+0

Работал как очарование! Можете ли вы объяснить, что мы здесь сделали? Я хочу сказать, почему я хочу заменить groupByKey() на reduceByKey()?В чем основное преимущество этого? Соответственно: http://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work – gsamaras

+1

Ну, 'groupByKey' собирается выпустить кучу вещей, которые будут отправляться между различными узлами , т.е. все значения, связанные с данным ключом, для всех ключей и частей данных. С другой стороны, с помощью метода aggregateByKey каждая часть несет ответственность только за передачу (водителю) пары, состоящей из суммы и количества. Так гораздо меньше сетевого общения, а также устраняя необходимость создания всех этих коллекций ценностей (поскольку для вычисления среднего значения важны только их сумма и количество). –

+0

Хорошо, вот что я подумал, спасибо большое! – gsamaras