2016-12-03 10 views
0

Я изучаю Map-Reduce с помощью Flink и задаюсь вопросом, как эффективно подсчитывать элементы в DataSet. То, что я до сих пор это:Flink: DataSet.count() является узким местом - как считать параллель?

DataSet<MyClass> ds = ...; 
long num = ds.count(); 

При выполнении этого, в моем Флинка войти он говорит

12/03/2016 19:47:27 DataSink (кол-()) (1/1) переключился на RUNNING

Таким образом, используется только один процессор (у меня есть четыре и другие команды, такие как уменьшение использования всех них).

Я думаю, что count() внутренне собирает DataSet со всех четырех процессоров и подсчитывает их последовательно, вместо того, чтобы каждый процессор подсчитывал свою часть, а затем суммировал ее. Это правда?

Если да, то как я могу использовать преимущества всех моих процессоров? Было бы неплохо сначала сопоставить мой DataSet с 2-кортежем, который содержит исходное значение как первый элемент, а длинное значение 1 - как второй элемент, а затем агрегировать его с помощью функции SUM?

Например, DataSet будет отображаться в DataSet>, где Long всегда будет равным 1. Поэтому, когда я суммирую все элементы, сумма второго значения кортежа будет правильным значением счетчика.

Какова наилучшая практика для подсчета элементов в DataSet?

Привет Саймон

ответ

0

DataSet#count() не является параллельной работы и, таким образом, можно использовать только один поток.

Вы должны сделать подсчет, чтобы получить распараллеливание и применить окончательную сумму по ключевым словам, чтобы получить общий счет, чтобы ускорить вычисление.

+0

Спасибо за ваш ответ. К сожалению, я не знаю, как я могу сделать подсчет. Это флинковая операция? Я не смог найти информацию об этом –

+0

Я нашел решение. Вы имели в виду это? \t \t \t набор данных> х = hasNum.map (новый MapFunction <МойКласс, Tuple1 >() { \t \t \t \t @Override \t \t \t \t общественных Tuple1 карту (МойКласс т) бросает исключение { \t \t \t \t \t возвращение новый Tuple1 (1L); \t \t \t \t} \t \t \t \t \t \t \t}). ГруппаBy (0).сумма (0); –

+0

Почти;) Я прокомментирую ваш собственный ответ –

0

Это хорошее решение?

DataSet<Tuple1<Long>> x = ds.map(new MapFunction<MyClass, Tuple1<Long>>() { 
    @Override public Tuple1<Long> map(MyClass t) throws Exception { 
     return new Tuple1<Long>(1L); 
    } 
}).groupBy(0).sum(0); 

Long c = x.collect().iterator().next().f0; 
+1

Общая идея верна - однако вы переходите к Tuple1 и используете одно и то же значение для всех кортежей - таким образом, все кортежи оказываются в одном и том же потоке, и это эффективно все еще однопоточно - даже если запускается несколько потоков, только один получит данные. Таким образом, вы должны генерировать случайное значение в 'new Tuple1 (...)'. Вместо 'sum (0)' вы используете 'count()'. Это даст вам несколько пунктов, которые вам нужно подвести итог. –

+0

Есть ли причина, по которой count() не делает это внутри? –

+0

Нет технической причины. Теоретически Флинк применил бы эту оптимизацию автоматически. Просто случается, что это никогда не было реализовано - вам нужно будет расследовать в списке рассылки, почему. –