2016-09-19 3 views
1

Возможно ли комбинировать GraphX ​​и DataFrames? Я хочу, чтобы для каждого узла в Графе собственный DataFrame. Я знаю, что GraphX ​​и DataFrame расширяют RDD и вложенные RDD не возможны, и SparkContext не является Serializable. Но в Spark 2.0.0 я увидел, что SparkSession является Serializable. Я пробовал, но он все еще не работает. Я также пытался хранить глобальные DataFrames в массиве. Но я не могу получить доступ к массиву в рабочей области. Игнорировать методы sendMsg и слияние:Спарк-комбинат DataFrames и GraphX ​​

object Main{ 
    def main(args: Array[String]) : Unit = {  
    val spark = SparkSession 
     .builder 
     .appName("ScalaGraphX_SQL") 
     .master("spark://home:7077") 
     .enableHiveSupport() 
     .getOrCreate() 

    val sc = spark.sparkContext 

    val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex() 

    //set array size 
    Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt) 

    //insert dataframe inside array tables 
    node_pair.collect().foreach{ case (arr,l) => { 
     val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true)) 
     val schema = StructType(fields) 
     val rows = new util.ArrayList[Row] 
     Tables.tables{l.toInt} = spark.createDataFrame(rows, schema) 
     //val f = 
     } 
    } 

    //create vertices 
    val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => { 
     (l,new TreeNode(l,false)) 
    } 
    } 

    //create edges 
    val edges : RDD[Edge[Boolean]] = node_pair 
     .filter{ case (arr,l) => arr(0).toLong != -1} 
     .map{ case (arr,l) => Edge(l,arr(0).toLong,true) 
     } 

    var init_node : TreeNode = new TreeNode(-1,false) 
    val graph = Graph(vertices,edges,init_node) 
    val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge) 

    graph_pregel.vertices.collect().foreach(v => println(v._2.index)) 
    } 

    def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = { 
    println(Tables.tables{act.index.toInt}) 
    act 
    } 

    def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = { 

    if(et.srcAttr.v){ 
     println(et.srcId + "--->" + et.dstId) 
     Iterator((et.dstId,et.srcAttr)) 
    }else{ 
     //println(et.srcId + "-/->" + et.dstId) 
     Iterator.empty 
    } 
    } 

    def merge(n1:TreeNode, n2:TreeNode): TreeNode = { 
    n1 
    } 
} 

object Tables extends Serializable{ 
    var tables : scala.Array[Dataset[Row]] = null 
} 

class TreeNode(val index:Long, var v: Boolean) extends Serializable { 
} 

Может быть, есть возможность получить доступ к глобальному массиву с РДОМ? Или у кого-то есть другое решение этой проблемы?

+0

Проблема не и не была сериализацией. Не сериализуемое является лишь намеком здесь, что указывает на основную проблему, что архитектура Spark не подходит для вложенной обработки без существенного ограничения модели программирования. Поэтому просто потому, что вы можете сериализовать 'SparkSession' (вы можете сериализовать' SQLContext' в 1.x одинаково), это не значит, что что-то изменилось. – zero323

ответ

1

Пожалуйста, взгляните на GraphFrames - это пакет, который предоставляет API DataFrame для GraphX. GraphFrames будет рассматриваться для включения в Spark, когда он предоставляет такие функции, как разбиение на разделы, что важно в GraphX, и когда API тестируется более исчерпывающе.

Для задачи, описанной в комментариях ниже, у вас есть один DataFrame с узлами, т.е. аэропорты:

val airports = sqlContext.createDataFrame(List(
    ("A1", "Wrocław"), 
    ("A2", "London"), 
    ("A3", "NYC") 
)).toDF("id", "name") 

ID уникален. Вы можете создать другой DataFrame, т. Е. DetailsDF, со структурой вроде: ID | AirPortID | other data. Тогда у вас есть «Один ко многим» и один аэропорт (так что VertFlele GraphFrame), у вас есть много записей в деталяхDF. Теперь вы можете запросить: spark.sql("select a.name, d.id as detailID from airports a join detailsDF d on a.id = d.airportID");. Вы также можете иметь много столбцов в аэропортах DataFrame, если вы хотите сохранить там дополнительную информацию.

+0

Спасибо, но arent GraphFrames, графики, структурированные как DataFrames? Мне нужен График с DataFrames внутри узлов. Это как таблица для каждого узла. Или я неправильно понял GraphFrames? –

+0

Да и не :) В GraphFrames для каждого узла есть таблица (DataFrame). Однако этот узел может иметь некоторый идентификатор, а затем может быть другой DataFrame, т. Е. NodeDetails, который будет иметь столбец «baseNodeId». Тогда у вас может быть много строк для одного узла. –

+0

Прошу прощения, но я не понимаю. Разве это не вложенный DataFrame? Можете ли вы дать мне короткий пример? Большое спасибо! –