2015-03-12 2 views
1

У меня есть RDD формы:Spark: Пересечение списков не работает

t1-> (Long, List[Long]) 

и список формы

t2-> List[Long] 

мне нужно выполнить объединение и пересечение списков. Я пробую следующий код:

val t1 = a.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _) 
val t2 = b.map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _).map(x => x._2).collect() 
val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2)))) 
val t3union = t1.map(x => (x._1, (x._2.union(t2)))) 

Хотя союз возвращает правильные результаты, пересечение всегда является пустым списком. Я не могу определить проблему. Пожалуйста помоги! Благодаря!

Вот пример:

(1, List(1596, 1617, 1929, 2399, 2674)) 
(2, List(1702, 1785, 1933, 2054, 2583, 2913)) 
(3, List(1982, 2002, 2048, 2341, 2666)) 

и

List(2002, 2399) 

Это должно вернуть пересечение:

(1, List(2399)) 
(2, List()) 
(3, List(2002)) 

и союз:

(1, List(1596, 1617, 1929, 2399, 2674, 2002)) 
(2, List(1702, 1785, 1933, 2054, 2583, 2913, 2002, 2399)) 
(3, List(1982, 2002, 2048, 2341, 2666, 2399)) 
+0

Если ответы людей помогли вам как-то решить проблему, может быть, вы должны принять один. – ale64bit

ответ

1

Ваш код пересечения выглядит хорошо для меня. Он должен работать. Кроме того, попробуйте сделать это для большей ясности и, возможно, производительность:

val t3intersect = t1.mapValues(_ intersect t2) 

EDIT: Я не знаю, что это a и b, и то, что логика получения t1 и t2 от них, но если вы инициализировать t1 и t2 в Спарк РЕПЛ следующим образом, для тестирования:

scala> val t1 = sc.parallelize(List(
    | (1, List(1596, 1617, 1929, 2399, 2674)), 
    | (2, List(1702, 1785, 1933, 2054, 2583, 2913)), 
    | (3, List(1982, 2002, 2048, 2341, 2666))), 2) 
t1: org.apache.spark.rdd.RDD[(Int, List[Int])] = ParallelCollectionRDD[10] at parallelize at <console>:12 

scala> val t2 = List(2002, 2399) 
t2: List[Int] = List(2002, 2399) 

Тогда вы получить ожидаемые результаты:

scala> val tr = t1.mapValues(_ intersect t2) 
tr: org.apache.spark.rdd.RDD[(Int, List[Int])] = MappedValuesRDD[12] at mapValues at <console>:16 

scala> tr.collect() 
res13: Array[(Int, List[Int])] = Array((1,List(2399)), (2,List()), (3,List(2002))) 

Итак, следите за ситуацией в другом месте.

+0

Это не работает. Разрушила мне голову на целый день. Есть ли другой способ, которым я мог бы это сделать? – GobSmack

+0

Не могли бы вы разместить точную последовательность команд, которую вы выполняете? В том числе для создания RDD и списка. – ale64bit

+0

t1: (204, Список (2049, 2097, 2674)) t2: Список (2049, 2097, 2674) Пересечение: (204, Список()) Союз: (204, Список (2049, 2097, 2674, Список (2049, 2097, 2674))) Я просто понял, что союз дает неверные результаты. – GobSmack

1

Я воспроизведен проблематичное дело следующим образом:

object ItersectionList { 

    def main(args: Array[String]) { 
    val spConf = new SparkConf().setMaster("local[2]").setAppName("ItersectionList") 
    val sc = new SparkContext(spConf) 

    val a = Array(
     (1, List(1596, 1617, 1929, 2399, 2674)), 
     (2, List(1702, 1785, 1933, 2054, 2583, 2913)), 
     (3, List(1982, 2002, 2048, 2341, 2666)) 
    ) 

    val t2 = List(2002, 2399) 

    val t1 = sc.makeRDD(a).map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _) 
    val t3intersect = t1.map(x => (x._1, (x._2.intersect(t2)))) 
    val t3union = t1.map(x => (x._1, (x._2.union(t2)))) 

    t3intersect.foreach(println) 
    t3union.foreach(println) 

    } 
} 

И результат выглядит следующим образом:

Intersection: 
(2,List()) 
(1,List()) 
(3,List()) 

Union: 
(2,List(List(1702, 1785, 1933, 2054, 2583, 2913), 2002, 2399)) 
(1,List(List(1596, 1617, 1929, 2399, 2674), 2002, 2399)) 
(3,List(List(1982, 2002, 2048, 2341, 2666), 2002, 2399)) 

Я нашел это проблема List(x._2) в map(x => (x._1, (List(x._2)))).reduceByKey(_ ++ _), который изменяет List(a, b, c) до List(List(a, b, c)). Поскольку List(List(a, b, c)) не соответствует List(a, b, c), пересечение будет равно нулю. Вы можете удалить List() следующим образом, и результат будет правильным.

val t1 = sc.makeRDD(a).map(x => (x._1, x._2)).reduceByKey(_ ++ _) 

или

val t1 = sc.makeRDD(a).reduceByKey(_ ++ _) 
+0

Ошибка произошла из-за сбора, который я выполнял для t2. Он создал массив [list], который не соответствовал списку в t1.Thanks для справки! – GobSmack

 Смежные вопросы

  • Нет связанных вопросов^_^