2016-05-17 3 views
2

Я хочу проверить, является ли новый граф (называемый A) субграфом другого графика (называемого B). И я пишу немного демо для теста, но не удалось! Я бег дем только на свечу-оболочке, свечу версии 1.6.1:Как использовать маску функции Spark graph?

// Build the GraphB 
val usersB = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal","postdoc")), 
    (5L, ("franklin", "prof")), 
    (2L, ("istoica", "prof")) 
)) 

val relationshipsB = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor"), 
    Edge(2L, 5L, "colleague"), 
    Edge(5L, 7L, "pi") 
)) 

val defaultUser = ("John Doe", "Missing") 

val graphB = Graph(usersB, relationshipsB, defaultUser) 

// Build the initial Graph A 
val usersA = sc.parallelize(Array(
    (3L, ("rxin", "student")), 
    (7L, ("jgonzal", "postdoc")), 
    (5L, ("franklin", "prof")) 
)) 

val relationshipsA = sc.parallelize(Array(
    Edge(3L, 7L, "collab"), 
    Edge(5L, 3L, "advisor") 
)) 

val testGraphA = Graph(usersA, relationshipsA, defaultUser) 

//do the mask 
val maskResult = testGraphA.mask(graphB) 
maskResult.edges.count 
maskResult.vertices.count 

В моем понимании API on spark website, маска Funciton может получить все то же ребро и вершину. Тем не менее, результат вершин только правильный (maskResult.vertices.count = 3), количество ребер должно быть 2, но не (maskResult.edges.count = 0).

ответ

2

Если вы посмотрите на the source, вы увидите, что mask использует EdgeRDD.innerJoin. Если вы посмотрите на the documentation для innerJoin, вы увидите предостережение:

Внутренних соединения этой EdgeRDD с другим EdgeRDD, предполагая, что оба распределял с использованием того же PartitionStrategy.

Вам необходимо будет создать и использовать PartitionStrategy. Если вы сделаете следующее, то получите результаты, которые вы хотите (но, вероятно, не очень хорошо масштабируется):

object MyPartStrat extends PartitionStrategy { 
    override def getPartition(s: VertexId, d: VertexId, n: PartitionID) : PartitionID = { 
    1  // this is just to prove the point, you'll need a real partition strategy 
    } 
} 

Тогда, если вы:

val maskResult = testGraphA.partitionBy(MyPartStrat).mask(graphB.partitionBy(MyPartStrat)) 

Вы получите результат, который вы хотите. Но, как я уже сказал, вам, вероятно, нужно найти лучшую стратегию разделения, чем просто набивать все в один раздел.

+0

Хороший ответ. Я бы добавил, что он может выбрать одну из предварительно упакованных стратегий разделов, которые можно найти здесь (http://spark.apache.org/docs/1.5.1/api/scala/index.html#org. apache.spark.graphx.PartitionStrategy $). Таким образом, возможно, ему не нужно на самом деле создавать его, он может использовать как 'testGraphA.partitionBy (PartitionStrategy.CanonicalRandomVertexCut)' –

+1

Ницца, добавит к моему ответу позже –

 Смежные вопросы

  • Нет связанных вопросов^_^