2017-02-13 18 views
0

В настоящее время я работаю над потоковой программой, которая объединяет данные из нескольких сообщений (8), для агрегации требуются все 8 сообщений, поэтому я использую окно счетчика. Все 8 сообщений имеют один и тот же уникальный ключ. Однако нет никакой гарантии, что все 8 сообщений прибудут. Итак, мой вопрос в два раза:Есть ли способ определить окно счета Flink, которое выдает все сообщения по истечении заданного времени, если счет не достигнут?

Первое, что происходит с окном подсчета Flink, которое никогда не закрывается? Я предполагаю, что окна просто накапливают сверхурочные, потребляя все больше и больше баранов.

Во-вторых, могу ли я закрыть окно счетчика, если оно не получит все его сообщения в течение заданного времени? Я ищу решение, которое как можно больше в реальном времени, я уже пробовал использовать временное окно, однако время пролета сообщений варьируется от нескольких миллисекунд до 40 секунд.

По существу, есть способ определить окно, которое запускает 8 сообщений, и выдает все сообщения из окна по истечении заданного времени (в этом случае через 60 секунд)?

ответ

1

Ответ на ваш вопрос, касающийся никогда закрытых окон, заключается в том, что часть зарезервированного для них государства никогда не будет освобождена.

Ваше описанное поведение может быть реализовано с помощью специального триггера и эврика в глобальном окне. Триггер мог либо ждать ожидаемое время, либо количество элементов перед выпуском окна, в то время как evictor вытеснил бы все сообщения, если их было меньше 8. Для некоторой ссылочной реализации вы можете посмотреть CountTrigger (испускается при подсчете) и EventTimeTrigger (испускает время). Для evictor посмотрите на CountEvictor.

1

Для таких случаев, когда вам нужно комбинировать обработку потока с потоком данных с таймерами, ProcessFunction может быть хорошим выбором. См. https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html.

+0

Похоже, что в этом примере есть некоторые ошибки, например, для кода «ctx.timerService(). RegisterEventTimeTimer (current.timestamp + 60000); текущий тип CountWithTimestamp и имеет три поля, однако ни один из них не назвал timestamp (current.timestamp). –

+0

Да, вы правы. Это было исправлено в документах для Flink 1.3. https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/process_function.html#example –

 Смежные вопросы

  • Нет связанных вопросов^_^