Для исследовательского проекта я попытался отсортировать элементы в RDD. Я сделал это в двух разных подходах.Сортировка элементов RDD
В первом методе я применил функцию mapPartitions() на RDD, чтобы он сортировал содержимое RDD и предоставлял результат RDD, который содержит отсортированный список как единственную запись в RDD. Затем я применил функцию уменьшения, которая в основном объединяет отсортированные списки.
Я провел эти эксперименты на кластере EC2, содержащем 30 узлов. Я установил его с использованием сценария искры ec2. Файл данных был сохранен в HDFS.
Во втором подходе я использовал метод sortBy в Spark.
Я выполнил эти операции по данным переписи США (100MB) обнаружили here
А одиночные линии выглядит как этот
9, Not in universe, 0, 0, Children, 0, Not in universe, Never married, Not in universe or children, Not in universe, White, All other, Female, Not in universe, Not in universe, Children or Armed Forces, 0, 0, 0, Nonfiler, Not in universe, Not in universe, Child <18 never marr not in subfamily, Child under 18 never married, 1758.14, Nonmover, Nonmover, Nonmover, Yes, Not in universe, 0, Both parents present, United-States, United-States, United-States, Native- Born in the United States, 0, Not in universe, 0, 0, 94, - 50000.
Я отсортированные на основе 25-значения в CSV. В этой строке 1758.14.
Я заметил, что sortBy выполняет хуже, чем другой метод. Это ожидаемый сценарий? Если это так, почему бы mapPartitions() и reduce() не использовать метод сортировки по умолчанию?
Вот моя реализация
public static void sortBy(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32);
long start = System.currentTimeMillis();
rdd.sortBy(new Function<String, Double>(){
@Override
public Double call(String v1) throws Exception {
// TODO Auto-generated method stub
String [] arr = v1.split(",");
return Double.parseDouble(arr[24]);
}
}, true, 9).collect();
long end = System.currentTimeMillis();
System.out.println("SortBy: " + (end - start));
}
public static void sortList(JavaSparkContext sc){
JavaRDD<String> rdd = sc.textFile("/data.txt",32); //parallelize(l, 8);
long start = System.currentTimeMillis();
JavaRDD<LinkedList<Tuple2<Double, String>>> rdd3 = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, LinkedList<Tuple2<Double, String>>>(){
@Override
public Iterable<LinkedList<Tuple2<Double, String>>> call(Iterator<String> t)
throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> lines = new LinkedList<Tuple2<Double, String>>();
while(t.hasNext()){
String s = t.next();
String arr1[] = s.split(",");
Tuple2<Double, String> t1 = new Tuple2<Double, String>(Double.parseDouble(arr1[24]),s);
lines.add(t1);
}
Collections.sort(lines, new IncomeComparator());
LinkedList<LinkedList<Tuple2<Double, String>>> list = new LinkedList<LinkedList<Tuple2<Double, String>>>();
list.add(lines);
return list;
}
});
rdd3.reduce(new Function2<LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>, LinkedList<Tuple2<Double, String>>>(){
@Override
public LinkedList<Tuple2<Double, String>> call(
LinkedList<Tuple2<Double, String>> a,
LinkedList<Tuple2<Double, String>> b) throws Exception {
// TODO Auto-generated method stub
LinkedList<Tuple2<Double, String>> result = new LinkedList<Tuple2<Double, String>>();
while (a.size() > 0 && b.size() > 0) {
if (a.getFirst()._1.compareTo(b.getFirst()._1) <= 0)
result.add(a.poll());
else
result.add(b.poll());
}
while (a.size() > 0)
result.add(a.poll());
while (b.size() > 0)
result.add(b.poll());
return result;
}
});
long end = System.currentTimeMillis();
System.out.println("MapPartitions: " + (end - start));
}
Это может быть лучший вопрос для списка рассылки. –