Это следующий вопрос от here. Я пытаюсь реализовать k-средство на основе этого implementation. Он отлично работает, , но Я хотел бы заменить groupByKey()
на reduceByKey()
, но я не уверен, как (я не беспокоюсь о производительности сейчас). Вот соответствующий уменьшенная код:Заменить группуByKey() с помощью reduceByKey()
val data = sc.textFile("dense.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()
val read_mean_centroids = sc.textFile("centroids.txt").map(
t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
..
Обратите внимание, что println(newCentroids)
даст:
Карта (23 -> (-6.269305E-4, -0,0011746404, -4.08004E-5), 8 - > (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0,0016383086, -0,0016974678, 1,45 ..
и println(closest)
:
MapPartitionsRDD [6] на карте в kmeans.scala: 75
Соответствующий вопрос: Using reduceByKey in Apache Spark (Scala).
Некоторые documentation:
Защиту reduceByKey (FUNC: (V, V) ⇒ V): РДД [(K, V)]
Merge значения для каждого ключа используя ассоциативную функцию уменьшения.
Защиту reduceByKey (FUNC: (V, V) ⇒ V, numPartitions: Int): РДД [(K, V)]
Merge значения для каждого ключа с использованием ассоциативной снижения функции.
Защиту reduceByKey (разметки: Разметка, FUNC: (V, V) ⇒ V): РДД [(K, V)]
Merge значения для каждого ключа с помощью ассоциативной уменьшения функции.
Защиту groupByKey(): РДД [(К, Iterable [V])]
Группа значения для каждого ключа в RDD в одной последовательности.
Работал как очарование! Можете ли вы объяснить, что мы здесь сделали? Я хочу сказать, почему я хочу заменить groupByKey() на reduceByKey()?В чем основное преимущество этого? Соответственно: http://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work – gsamaras
Ну, 'groupByKey' собирается выпустить кучу вещей, которые будут отправляться между различными узлами , т.е. все значения, связанные с данным ключом, для всех ключей и частей данных. С другой стороны, с помощью метода aggregateByKey каждая часть несет ответственность только за передачу (водителю) пары, состоящей из суммы и количества. Так гораздо меньше сетевого общения, а также устраняя необходимость создания всех этих коллекций ценностей (поскольку для вычисления среднего значения важны только их сумма и количество). –
Хорошо, вот что я подумал, спасибо большое! – gsamaras