2015-06-09 1 views
0

Для исследовательского проекта я попытался отсортировать элементы в RDD. Я сделал это в двух разных подходах.Сортировка элементов RDD

В первом методе я применил функцию mapPartitions() на RDD, чтобы он сортировал содержимое RDD и предоставлял результат RDD, который содержит отсортированный список как единственную запись в RDD. Затем я применил функцию уменьшения, которая в основном объединяет отсортированные списки.

Я провел эти эксперименты на кластере EC2, содержащем 30 узлов. Я установил его с использованием сценария искры ec2. Файл данных был сохранен в HDFS.

Во втором подходе я использовал метод sortBy в Spark.

Я выполнил эти операции по данным переписи США (100MB) обнаружили here

А одиночные линии выглядит как этот

9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000. 

Я отсортированные на основе 25-значения в CSV. В этой строке 1758.14.

Я заметил, что sortBy выполняет хуже, чем другой метод. Это ожидаемый сценарий? Если это так, почему бы mapPartitions() и reduce() не использовать метод сортировки по умолчанию?

Вот моя реализация

public static void sortBy(JavaSparkContext sc){ 
     JavaRDD<String> rdd = sc.textFile("/data.txt",32); 
     long start = System.currentTimeMillis(); 
     rdd.sortBy(new Function<String, Double>(){ 

      @Override 
       public Double call(String v1) throws Exception { 
         // TODO Auto-generated method stub 
        String [] arr = v1.split(","); 
        return Double.parseDouble(arr[24]); 
       } 
     }, true, 9).collect(); 
     long end = System.currentTimeMillis(); 
     System.out.println("SortBy: " + (end - start)); 
    } 

public static void sortList(JavaSparkContext sc){ 
     JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8); 
     long start = System.currentTimeMillis(); 
     JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){ 

     @Override 
     public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t) 
      throws Exception { 
      // TODO Auto-generated method stub 
      LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>(); 
      while(t.hasNext()){  
      String s = t.next(); 
      String arr1[] = s.split(","); 
      Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s); 
      lines.add(t1); 
      } 
      Collections.sort(lines, new IncomeComparator()); 
      LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>(); 
      list.add(lines); 
      return list; 
     } 

     }); 
     rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){ 

     @Override 
     public LinkedList<Tuple2<Double, String>> call(
       LinkedList<Tuple2<Double, String>> a, 
       LinkedList<Tuple2<Double, String>> b) throws Exception { 
      // TODO Auto-generated method stub 
      LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>(); 
      while (a.size() > 0 && b.size() > 0) { 

      if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0) 
       result.add(a.poll()); 
      else 
       result.add(b.poll()); 
      } 

      while (a.size() > 0) 
      result.add(a.poll()); 

      while (b.size() > 0) 
      result.add(b.poll()); 

      return result; 

     } 

     });  
     long end = System.currentTimeMillis(); 
     System.out.println("MapPartitions: " + (end - start)); 
    } 
+0

Это может быть лучший вопрос для списка рассылки. –

ответ

0

Collect() является узким местом, как он возвращает все результаты водителя.
Он производит как IO-хит & дополнительный сетевой трафик на одиночный источник (в данном случае - драйвер).
Он также блокирует другие операции.

Вместо collect() в первом SortBy() сегмент кода, попытайтесь выполнить параллельную операцию, как saveAsTextFile(tmp) чем считаны с помощью sc.textFile(tmp).

В другом сегменте кода sortBy() используются как mapPartitions(), так и reduce() параллельные API-интерфейсы, поэтому работа выполняется полностью параллельно.
Казалось бы, это является причиной разницы в времени выполнения от конца до конца.

Обратите внимание, что ваши результаты не обязательно означают, что сумма времени выполнения на всех машинах хуже.

 Смежные вопросы

  • Нет связанных вопросов^_^