2016-12-14 7 views
0

Я хочу читать несколько файлов, подсчитывать повторяющиеся строки, сортировать строки по количеству повторений, принимать 10 самых повторяющихся строк.Извлечение первых элементов из нескольких отсортированных разделов

lines = env.readTextFile("logs-dir") 
tuples = lines.map(line -> Tuple2(line, 1)) 
aggregate = tuples.groupBy(0).sum(1) 
sort = aggregate.sortPartition(1, Order.DESCENDING) 
sorted.first(10).writeAsText("domains") 

Проблема в том, что first-n является произвольным и возвращает случайные 10 первых элементов из всех разделов.

Есть ли способ выбрать отсортированные элементы первого-первого из всех разделов без уменьшения параллелизма до 1?

ответ

1

Я решил бы эту проблему с параллельным MapPartitionFunction, который возвращает первые 10 элементов каждого раздела, отправляет результат в один раздел, сортирует его и снова принимает первые 10. Это будет выглядеть следующим образом:

lines = env.readTextFile("logs-dir") 
tuples = lines.map(line -> Tuple2(line, 1)) 
aggregate = tuples.groupBy(0).sum(1) 

// sort partitions in parallel 
sortPart = aggregate.sortPartition(1, Order.DESCENDING) 
// take first 10 of each partition 
firstPart = sortPart.mapPartition(new First(10)) 

// sort all in one partition 
sortFull = firstPart.sortPartition(1, Order.DESCENDING).parallelism(1) 
// take first 10 
first10 = sortFull.mapPartition(new First(10)) 
first10.writeAsText("domains") 

MapPartitionFunctionFirst было бы очень просто. Он просто подсчитывает, сколько записей пересылает и возвращает из функции mapPartition() после того, как счетчик опустится до 0.