// Создание окна из десяти пунктовСЕР - шаблон не выполняется после добавления окна
WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10);
// Применение функционального окна, добавляя некоторые пользовательские оценивающих все значения в окне
DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out)
//custom evaluation logic
out.collect(new ObservationEvent(1,"temperature", "stable"));
}
});
// Определение простого шаблона КЭП
Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first")
.subtype(ObservationEvent.class)
.where(new FilterFunction<ObservationEvent>() {
@Override
public boolean filter(ObservationEvent arg0) throws Exception {
System.out.println(arg0); //This function is not at all called
return false;
}
});
PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern);
Когда я запускаю этот код, функция фильтра внутри предложения where вовсе не вызвано. Я напечатал inactivityStream.print(), и я вижу соответствующее значение.
Теперь, когда я подключаю входной поток напрямую, не применяя окно. Образец соответствует
Я напечатал inputStream и WindowedStream, и я вижу, что они оба отправляют похожие данные.
Что я упускаю
Зачем мне ждать?. Кроме того, макет данных имеет только один ключ. Окно распечатывается немедленно, но функция фильтра не вызывается. Извините, но я не получаю часть «Ожидание». – madhairsilence
Вам нужно просто подождать, потому что не будет выхода из оконной функции для заданного ключа, пока вы не увидите 10 элементов для этого ключа. Можете ли вы опубликовать весь рабочий пример, чтобы я мог комментировать дальше? Трудно понять, что может быть причиной вашей проблемы без полного кода. –