Я работаю над потоковым процессором Flink, который читает события от Kafka. Эти события подкрепляются одним из полей и должны быть окончены в течение определенного периода времени, прежде чем быть уменьшены и выведены. Мой процессор использует время события как временную характеристику и поэтому считывает метку времени из событий, которые она потребляет. Вот что она в настоящее время выглядит следующим образом:Как использовать потоковое время FlinkWindow с метками метки и водяного знака?
source
.map(new MapEvent())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
@Override public long extractTimestamp(Event event) {
return event.getTimestamp();
}
})
.keyBy(new KeySelector())
.timeWindow(Time.minutes(1))
.reduce(new EventReducer())
.map(new MapToResult());
Что я знаю о событиях, заключается в следующем:
- Они неупорядоченные относительно времени события.
- Возможно позднее прибытие, поэтому события могут прибыть значительно позже, чем указано в timestamp. Для удобства использования, допустим, я знаю, что последнее возможное прибытие будет 20 секунд.
- Я хочу, чтобы мои действия были окончены ровно за минуту до того, как Флинк переведет их в следующий оператор сокращения.
И, наконец, вот мои вопросы:
- Учитывая мои описывались ранее прецеденты, является
BoundedOutOfOrdernessTimestampExtractor
хорошим выбором? Я прочитал свой путь через документы и увиделAssignerWithPunctuatedWatermarks
и другие предопределенные цепочки, доступные для создания водяных знаков, но не понял полностью, если бы это было лучше для меня. - Как работает
assignTimestampsAndWatermarks()
вместе с методомtimeWindow()
? Могут ли они вмешаться, когда дело доходит до позднего прибытия? Есть ли что-нибудь, что я должен держать в этой области, о которой я должен помнить?
Спасибо, это помогло понять, как эти две вещи работают вместе. – Patze
Хорошо, я сделал некоторый прогресс в моей реализации, а также написал несколько тестов для проверки задержки и удаления окна. Те тесты ведут себя странно, потому что они дают невоспроизводимые результаты. Может быть, я все еще что-то упускаю. Вот что я делаю в тестах. Сначала я создаю кучу сущностей, время события которых точно соответствует одному времени (1 минута). Затем я создаю еще одну сущность, которая опаздывает (за пределами окна И разрешенной задержки). После этого я снова создаю те же самые события с первого шага (такое же время события) и отправлю их для переключения. Результат варьируется от run to run. – Patze
Что касается правильности ваших результатов, мне нужно будет посмотреть пример. Но для случайности я подозреваю, что вы не включили 'TimeCharacteristic.EventTime' в' StreamExecutionEnvironment', так ли это? –