У меня есть следующий код для подсчета слов из socketTextStream. Необходимы как кумулятивные подсчеты слов, так и подсчет числа окон. У программы есть проблема, что cumulateCounts всегда совпадает с количеством оконных окон. Почему эта проблема возникает? Каков правильный способ подсчета суммы подсчета кумулятов на подсчет окон?Apache Flink Streaming window WordCount
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});
Спасибо за ваш ответ. Не могли бы вы объяснить, почему мое решение не удалось? В чем разница потоков данных в flink-кластере между этими решениями? –
Обновлен мой ответ. –