2015-10-30 10 views
1

У меня есть следующий код для подсчета слов из socketTextStream. Необходимы как кумулятивные подсчеты слов, так и подсчет числа окон. У программы есть проблема, что cumulateCounts всегда совпадает с количеством оконных окон. Почему эта проблема возникает? Каков правильный способ подсчета суммы подсчета кумулятов на подсчет окон?Apache Flink Streaming window WordCount

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>(); 

final DataStream<Tuple2<String, Integer>> counts = env 
      .socketTextStream("localhost", 9999) 
      .flatMap(new Splitter()) 
      .window(Time.of(5, TimeUnit.SECONDS)) 
      .groupBy(0).sum(1) 
      .flatten(); 

counts.print(); 

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() { 
    @Override 
    public void invoke(Tuple2<String, Integer> value) throws Exception { 
     String word = value.f0; 
     Integer delta_count = value.f1; 
     Integer count = cumulateCounts.get(word); 
     if (count == null) 
      count = 0; 
     count = count + delta_count; 
     cumulateCounts.put(word, count); 
     System.out.println("(" + word + "," + count.toString() + ")"); 
    } 
}); 

ответ

4

Вы должны в первую группу мимо, и применить окно на поток шпоночным данных (ваш код работает на Флинка 0.9.1, но новый API в Флинка 0.10.0 строго об этом):

final DataStream<Tuple2<String, Integer>> counts = env 
     .socketTextStream("localhost", 9999) 
     .flatMap(new Splitter()) 
     .groupBy(0) 
     .window(Time.of(5, TimeUnit.SECONDS)).sum(1) 
     .flatten(); 

Если вы примените окно к потоку без ключа, на одной машине (то есть без параллелизма) будет создан только один оператор с потоковым окном для окна (во Flink 0.9.1 , это глобальное окно можно разделить на под-окна на groupBy() - однако в Flink 0.10.0 это больше не будет работать). Чтобы подсчитать слова, вы хотите создать окно для каждого отдельного значения ключа, то есть сначала вы получите субпоток для значения ключа (через groupBy()) и примените оператор окна для каждого подпотока (таким образом, у вас может быть собственное окно экземпляр оператора для каждого подпотока, допускающий параллельное выполнение).

Для глобального (кумулятивного) счета вы можете просто применить конструкцию groupBy().sum(). Во-первых, поток разбивается на подпоток (по одному для каждого значения ключа). Во-вторых, вы вычисляете сумму по потоку. Поскольку поток равен , а не оконный код, сумма вычисляется (суммируется) и обновляется для каждого входящего кортежа (более подробно сумма имеет начальное значение результата, равное нулю, и результат обновляется для каждого кортежа как result += tuple.value). После каждого вызова суммы возникает новый текущий результат.

В вашем коде, вы не должны использовать специальную функцию радиатора, но сделайте следующее:

counts.groupBy(0).sum(1).print(); 
+0

Спасибо за ваш ответ. Не могли бы вы объяснить, почему мое решение не удалось? В чем разница потоков данных в flink-кластере между этими решениями? –

+0

Обновлен мой ответ. –

 Смежные вопросы

  • Нет связанных вопросов^_^