2016-06-22 2 views
0

Я использую graphx api в итеративном алоризме. Хотя I have carefully cache/ unpersist rdd, and take care of the vertices partition num. Временные затраты по-прежнему увеличиваются за раунд в линейном тренде. Упрощенная версия моего кода, как следующий, и он получает ту же проблему: тенденции стоимостиSpark Graphx: Time Cost увеличивается стабильно за раунд в линейном стиле

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.graphx.Graph 
import org.apache.spark.graphx.util.GraphGenerators 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.mutable.ArrayBuffer 


object ComputingTimeProblem extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 
    val conf = new SparkConf().setMaster("local[1]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    var graph = GraphGenerators 
     .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d) 
     .cache 
    graph.vertices.take(10).foreach(println) 

    val maxIter = 50 

    var preGraph: Graph[Double, Int] = null 
    var allTime: ArrayBuffer[Double] = ArrayBuffer() 
    for (i <- 1 to maxIter) { 

     val begin = System.currentTimeMillis() 

     preGraph = graph 

     val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _) 
     graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache 
     graph.vertices.take(10) 

     preGraph.unpersist() 

     val end = System.currentTimeMillis() 

     val duration = (end - begin)/(60 * 1000d) 
     allTime += duration 
     println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
      duration, graph.vertices.getNumPartitions)) 
    } 

    graph.vertices.take(10).foreach(println) 

    val avgTime = allTime.sum/allTime.size 
    println(s"Average Time = ${avgTime}") 

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1)) 
    timeCostDiffs 
     .zipWithIndex 
     .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2+1, x._2 + 2, x._1)) 
     .foreach(println) 

    println("tc\n"+allTime.mkString("\n")) 
} 

как раз после enter image description here

Я не изменил индекс объекта графа, и Graphx бы присоединиться вершины по методу leftZipJoin, которые не требуют перетасовки, так почему временные затраты все равно увеличиваются за раунд. Может кто-нибудь дать некоторые конструктивные варианты, спасибо ?!

ответ

1

Это еще проблема линии, я только что нашел. Объект Graph имеет два rdd: vertex rdd и edge rdd. В приведенном выше коде я только что материализовал вершину rdd, а не край rdd. Итак, каждый раунд, он будет пересчитывать предыдущие ребра агагина. Таким образом, материализация как rdd с triplets объект решит проблему, а именно:

import org.apache.log4j.{Level, Logger} 
import org.apache.spark.graphx.Graph 
import org.apache.spark.graphx.util.GraphGenerators 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.mutable.ArrayBuffer 


object ComputingTimeProblem extends App { 

    Logger.getLogger("org").setLevel(Level.ERROR) 
    Logger.getLogger("akka").setLevel(Level.ERROR) 
    val conf = new SparkConf().setMaster("local[1]").setAppName("test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 

    var graph = GraphGenerators 
     .logNormalGraph(sc, 15000).mapVertices((_, _) => 1d) 
     //  .partitionBy(PartitionStrategy.RandomVertexCut,8) 
     .cache 
    graph.vertices.take(10).foreach(println) 

    val maxIter = 50 

    var preGraph: Graph[Double, Int] = null 
    var allTime: ArrayBuffer[Double] = ArrayBuffer() 
    for (i <- 1 to maxIter) { 
     val begin = System.currentTimeMillis() 

     preGraph = graph 

     val vertices2 = graph.triplets.map(tri => (tri.srcId, tri.dstAttr)).reduceByKey(_ + _) 
     graph = graph.joinVertices(vertices2)((vid, left, right) => left + right).cache 
     graph.triplets.take(10) // here materialize both vertex and edge rdd 
     // graph.vertices.take(10) 

     preGraph.unpersist() 

     val end = System.currentTimeMillis() 

     val duration = (end - begin)/(60 * 1000d) 
     allTime += duration 
     println(s"Round ${i} Time Cost: %.4f min, Vertices Partition Num: %d".format(
      duration, graph.vertices.getNumPartitions)) 
    } 

    graph.vertices.take(10).foreach(println) 

    val avgTime = allTime.sum/allTime.size 
    println(s"Average Time = ${avgTime}") 

    val timeCostDiffs = for (i <- 1 until maxIter) yield (allTime(i) - allTime(i - 1)) 
    timeCostDiffs 
     .zipWithIndex 
     .map(x => "Round %d to %d, Time Cost Diff: %.4f min".format(x._2 + 1, x._2 + 2, x._1)) 
     .foreach(println) 


    println("tc\n" + allTime.mkString("\n")) 

} 

 Смежные вопросы

  • Нет связанных вопросов^_^