2017-02-01 15 views
2

Я работаю над потоковым процессором Flink, который читает события от Kafka. Эти события подкрепляются одним из полей и должны быть окончены в течение определенного периода времени, прежде чем быть уменьшены и выведены. Мой процессор использует время события как временную характеристику и поэтому считывает метку времени из событий, которые она потребляет. Вот что она в настоящее время выглядит следующим образом:Как использовать потоковое время FlinkWindow с метками метки и водяного знака?

source 
    .map(new MapEvent()) 
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) { 
     @Override public long extractTimestamp(Event event) { 
       return event.getTimestamp(); 
      } 
     }) 
    .keyBy(new KeySelector()) 
    .timeWindow(Time.minutes(1)) 
    .reduce(new EventReducer()) 
    .map(new MapToResult()); 

Что я знаю о событиях, заключается в следующем:

  1. Они неупорядоченные относительно времени события.
  2. Возможно позднее прибытие, поэтому события могут прибыть значительно позже, чем указано в timestamp. Для удобства использования, допустим, я знаю, что последнее возможное прибытие будет 20 секунд.
  3. Я хочу, чтобы мои действия были окончены ровно за минуту до того, как Флинк переведет их в следующий оператор сокращения.

И, наконец, вот мои вопросы:

  1. Учитывая мои описывались ранее прецеденты, является BoundedOutOfOrdernessTimestampExtractor хорошим выбором? Я прочитал свой путь через документы и увидел AssignerWithPunctuatedWatermarks и другие предопределенные цепочки, доступные для создания водяных знаков, но не понял полностью, если бы это было лучше для меня.
  2. Как работает assignTimestampsAndWatermarks() вместе с методом timeWindow()? Могут ли они вмешаться, когда дело доходит до позднего прибытия? Есть ли что-нибудь, что я должен держать в этой области, о которой я должен помнить?

ответ

3

Я думаю, что мы должны начать с концепции водяного знака. Коротко говоря, водяной знак говорит, что большинство событий с более ранними отметками уже прибыли. Исходя из этого предположения, timeWindow может испускать окно, когда водяной знак пропускает конец окна. Конечно, все еще могут произойти поздние прибытия, с которыми вы, возможно, захотите справиться. Здесь дается концепция allowedLateness, которая определяет, как долго после выпуска окна мы должны отслеживать элементы, которые были там, чтобы мы могли, например, обновить наш приемник с этими поздними событиями (но нужно помнить, что окно уже было выпущено без этого элемента) , Надеюсь, это как-то ответит на ваш вопрос секунд.

Возвращаясь к первому вопросу, если у вас есть много событий, которые могут быть опозданы на 20 секунд, я думаю, что лучший выбор - BoundedOutOfOrdernessTimestampExtractor. Таким образом, хотя излучение каждого окна будет задерживаться на эти 20 секунд. Если поздние прибытия довольно спорадические, и вы можете обрабатывать дубликаты, тогда вы можете подумать о другом.

AssignerWithPunctuatedWatermarks Вы упомянули, как говорится в документе, если некоторые события в вашем потоке уже действуют как водяной знак. Поэтому не думайте, что это подходит вашему делу.

Для получения дополнительной информации о водяных знаках вы можете прочитать doc или this и that

+0

Спасибо, это помогло понять, как эти две вещи работают вместе. – Patze

+0

Хорошо, я сделал некоторый прогресс в моей реализации, а также написал несколько тестов для проверки задержки и удаления окна. Те тесты ведут себя странно, потому что они дают невоспроизводимые результаты. Может быть, я все еще что-то упускаю. Вот что я делаю в тестах. Сначала я создаю кучу сущностей, время события которых точно соответствует одному времени (1 минута). Затем я создаю еще одну сущность, которая опаздывает (за пределами окна И разрешенной задержки). После этого я снова создаю те же самые события с первого шага (такое же время события) и отправлю их для переключения. Результат варьируется от run to run. – Patze

+0

Что касается правильности ваших результатов, мне нужно будет посмотреть пример. Но для случайности я подозреваю, что вы не включили 'TimeCharacteristic.EventTime' в' StreamExecutionEnvironment', так ли это? –

0

Может быть, ваш водяной знак всегда меньше, чем оконное EndTime, так что он не будет вызывать окно с получением results.Points того, как для запуска окна:

  1. водяной знак> = окно-конец.
  2. В этом окне есть некоторые элементы.