Возможно ли комбинировать GraphX и DataFrames? Я хочу, чтобы для каждого узла в Графе собственный DataFrame. Я знаю, что GraphX и DataFrame расширяют RDD и вложенные RDD не возможны, и SparkContext не является Serializable. Но в Spark 2.0.0 я увидел, что SparkSession является Serializable. Я пробовал, но он все еще не работает. Я также пытался хранить глобальные DataFrames в массиве. Но я не могу получить доступ к массиву в рабочей области. Игнорировать методы sendMsg и слияние:Спарк-комбинат DataFrames и GraphX
object Main{
def main(args: Array[String]) : Unit = {
val spark = SparkSession
.builder
.appName("ScalaGraphX_SQL")
.master("spark://home:7077")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
val node_pair : RDD[(Array[String],Long)] = sc.textFile(args(0)).map(l=>l.split(" ")).zipWithIndex()
//set array size
Tables.tables = new Array[Dataset[Row]](node_pair.count().toInt)
//insert dataframe inside array tables
node_pair.collect().foreach{ case (arr,l) => {
val fields = arr.takeRight(arr.length-2).map(fieldName => StructField(fieldName, BooleanType, nullable = true))
val schema = StructType(fields)
val rows = new util.ArrayList[Row]
Tables.tables{l.toInt} = spark.createDataFrame(rows, schema)
//val f =
}
}
//create vertices
val vertices : RDD[(VertexId,TreeNode)]= node_pair.map{ case (arr,l) => {
(l,new TreeNode(l,false))
}
}
//create edges
val edges : RDD[Edge[Boolean]] = node_pair
.filter{ case (arr,l) => arr(0).toLong != -1}
.map{ case (arr,l) => Edge(l,arr(0).toLong,true)
}
var init_node : TreeNode = new TreeNode(-1,false)
val graph = Graph(vertices,edges,init_node)
val graph_pregel = Pregel(graph,init_node,Int.MaxValue,EdgeDirection.Out)(vProg,sendMsg,merge)
graph_pregel.vertices.collect().foreach(v => println(v._2.index))
}
def vProg(id:VertexId, act: TreeNode, other: TreeNode): TreeNode = {
println(Tables.tables{act.index.toInt})
act
}
def sendMsg(et : EdgeTriplet[TreeNode,Boolean]) : Iterator[(VertexId, TreeNode)] = {
if(et.srcAttr.v){
println(et.srcId + "--->" + et.dstId)
Iterator((et.dstId,et.srcAttr))
}else{
//println(et.srcId + "-/->" + et.dstId)
Iterator.empty
}
}
def merge(n1:TreeNode, n2:TreeNode): TreeNode = {
n1
}
}
object Tables extends Serializable{
var tables : scala.Array[Dataset[Row]] = null
}
class TreeNode(val index:Long, var v: Boolean) extends Serializable {
}
Может быть, есть возможность получить доступ к глобальному массиву с РДОМ? Или у кого-то есть другое решение этой проблемы?
Проблема не и не была сериализацией. Не сериализуемое является лишь намеком здесь, что указывает на основную проблему, что архитектура Spark не подходит для вложенной обработки без существенного ограничения модели программирования. Поэтому просто потому, что вы можете сериализовать 'SparkSession' (вы можете сериализовать' SQLContext' в 1.x одинаково), это не значит, что что-то изменилось. – zero323