2014-10-26 3 views
0

У меня есть два RDD: points и pointsWithinEps. Каждая точка в points представляет координату x, y. pointsWithinEps представляют две точки и расстояние между ними: ((x, y), distance). Я хочу, чтобы петля все точки и для каждого точечного фильтра только те элементы, которые находятся в pointsWithinEps как x (первая) координата. Поэтому я делаю следующее:Выполнение кода внутри Spark foreach

points.foreach(p => 
     val distances = pointsWithinEps.filter{ 
     case((x, y), distance) => x == p 
     } 
     if (distances.count() > 3) { 
//  do some other actions 
     } 
    ) 

Но этот синтаксис недействителен. Насколько я понимаю, не допускается создавать переменные внутри Spark foreach. Должен ли я делать что-то подобное?

for (i <- 0 to points.count().toInt) { 
    val p = points.take(i + 1).drop(i) // take the point 
    val distances = pointsWithinEps.filter{ 
    case((x, y), distance) => x == p 
    } 
    if (distances.count() > 3) { 
    //  do some other actions 
    } 
} 

Или есть лучший способ сделать это? Полный код размещается здесь: https://github.com/timasjov/spark-learning/blob/master/src/DBSCAN.scala

EDIT:

points.foreach({ p => 
    val pointNeighbours = pointsWithinEps.filter { 
    case ((x, y), distance) => x == p 
    } 
    println(pointNeighbours) 
}) 

Сейчас я следующий код, но он бросает NullPointerException (pointsWithinEps). Как это можно исправить, почему pointsWithinEps имеет значение null (перед тем, как в нем есть элементы)?

+0

ли я понимаю правильно, что для каждой точки (х, у) на 'points', вы хотите, чтобы все ((Икс , y), расстояние) кортежей из 'pointsWithinEps', которые начинаются с того же (x)? – maasg

+0

есть. В основном для каждой точки я хочу найти, какие другие точки являются ее соседями (точки, находящиеся внутри эпсилона). В моем случае это сама точка и х в структуре ((x, y), расстояние). Код находится в github, так что, например, вы можете его выполнить, а в отладчике точно найдите значения. – Bob

ответ

2

Для того, чтобы собрать все точки расстояния, которые начинаются на заданную координату, простым раздельным способом сделать это было бы ввести точки по этой координате x и сгруппировать их по этой клавише, например:

val pointsWithinEpsByX = pointsWithinEps.map{case ((x,y),distance) => (x,((x,y),distance))} 
val xCoordinatesWithDistance = pointsWithinEpsByX.groupByKey 

Тогда левая присоединиться к RDD точек с результатом предыдущего преобразования:

val pointsWithCoordinatesWithDistance = points.leftOuterJoin(xCoordinatesWithDistance) 
+0

Компилятор показывает, что там нет метода groupByKey, только groupBy. То же самое и для метода leftOuterJoin. Я использую искру 1.1.0, предварительно построенную для Hadoop 1.X – Bob

+0

Кроме того, следует ли положить его в петлю foreach? – Bob

+2

Это функции, доступные для RDD пар (ключ, значение) через неявное преобразование. Импортируйте 'org.apache.spark.SparkContext._' в верхней части своей программы, чтобы использовать эти функции. - Кроме того, нет необходимости в цикле. Этот функциональный конвейер выполняет свою работу, применяя преобразования и группировку ко всему набору данных. – maasg

0

Объявление переменных означает, что у вас есть блок, а не только выражение, поэтому вам нужно использовать фигурные скобки {}, например.

point.foreach({p => ... }) 
+0

Спасибо! Но можете ли вы также помочь с функциональностью, которую я описал? – Bob