2016-09-16 9 views
1

Я расчета счетчик (суммирующий 1) по timewindow следующим образом:Flink timeWindow получить время начала

mappedUserTrackingEvent 
      .keyBy("videoId", "userId") 
      .timeWindow(Time.seconds(30)) 
      .sum("count") 

Я хотел бы на самом деле добавить время начала окна в качестве ключевого поля тоже. поэтому результатом будет что-то вроде:

key: videoId=123,userId=234,time=2016-09-16T17:01:30 
value: 50 

Таким образом, совокупный подсчет по окну. Конечная цель - нарисовать гистограмму этих окон.

Как я могу добавить начало окна в качестве поля в ключе? и после этого выровнять окно с 00 или 30 в этом случае? Это возможно?

ответ

2

Способ по WindowFunctionapply() обеспечивает Window объект, который является TimeWindow, если вы используете keyBy().timeWindow(). Объект TimeWindow имеет два метода: getStart() и getEnd(), которые возвращают метку времени начала и конца окна, соответственно.

В настоящий момент невозможно использовать агрегацию sum() вместе с WindowFunction. Вам нужно сделать что-то вроде:

mappedUserTrackingEvent 
     .keyBy("videoId", "userId") 
     .timeWindow(Time.seconds(30)) 
     .apply(new MySumReduceFunction(), new MyWindowFunction());` 

MySumReduceFunction реализует интерфейс ReduceFunction и вычислить сумму путем постепенного объединения элементов, которые поступают в окне. MyWindowFunction реализует WindowFunction. Он получает агрегированное значение через параметр Iterable и обогащает значение меткой времени, полученной из параметра TimeWindow.

0

Вы можете использовать метод aggregate вместо суммы.
В aggregate установить второй параметр: WindowFunction или продолжит ProcessWindowFunction.
Я использую Flink-1.4.0, рекомендуется использовать ProcessWindowFunction, как:

mappedUserTrackingEvent 
    .keyBy("videoId", "userId") 
    .timeWindow(Time.seconds(30)) 
    .aggregate(new Count(), new MyProcessWindowFunction(); 

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow> 
{ 
    @Override 
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception 
    { 
     context.currentProcessingTime(); 
     context.window().getStart(); 
    } 
}