Я использую graphx api в итеративном алоризме. Хотя I have carefully cache/ unpersist rdd, and take care of the vertices partition num. Временные затраты по-прежнему увеличиваются за раунд в линейном тренде. Упрощенная версия моего кода, как следующий, и он получает ту же проблему: тенденции стоимостиSpark Graphx: Time Cost увеличивается стабильно за раунд в линейном стиле
import org.apache.log4j.{Level, Logger}
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object ComputingTimeProblem extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val conf = new SparkConf().setMaster("local[1]").setAppName("test")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var graph = GraphGenerators
.logNormalGraph(sc, 15000).mapVertices((_, _) => 1d)
.cache
graph.vertices.take(10).foreach(println)
val maxIter = 50
var preGraph: Graph[Double, Int] = null
var allTime: ArrayBuffer[Double] = ArrayBuffer()
for (i <- 1 to maxIter) {
val begin = System.currentTimeMillis()
preGraph = graph
val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _)
graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache
graph.vertices.take(10)
preGraph.unpersist()
val end = System.currentTimeMillis()
val duration = (end - begin)/(60 * 1000d)
allTime += duration
println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
duration, graph.vertices.getNumPartitions))
}
graph.vertices.take(10).foreach(println)
val avgTime = allTime.sum/allTime.size
println(s"Average Time = ${avgTime}")
val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1))
timeCostDiffs
.zipWithIndex
.map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1))
.foreach(println)
println("tc\n"+allTime.mkString("\n"))
}
Я не изменил индекс объекта графа, и Graphx бы присоединиться вершины по методу leftZipJoin, которые не требуют перетасовки, так почему временные затраты все равно увеличиваются за раунд. Может кто-нибудь дать некоторые конструктивные варианты, спасибо ?!