2017-01-06 12 views
0

// Создание окна из десяти пунктовСЕР - шаблон не выполняется после добавления окна

WindowedStream<ObservationEvent,Tuple,GlobalWindow> windowStream = inputStream.keyBy("rackId").countWindow(10); 

// Применение функционального окна, добавляя некоторые пользовательские оценивающих все значения в окне

DataStream<ObservationEvent> inactivityStream = windowStream.apply(new WindowFunction<ObservationEvent, ObservationEvent , Tuple , GlobalWindow>() { 

         @Override 
         public void apply(Tuple tuple, GlobalWindow timeWindow, Iterable<ObservationEvent> itr, Collector<ObservationEvent> out) 
           //custom evaluation logic 
           out.collect(new ObservationEvent(1,"temperature", "stable")); 
         } 
       }); 

// Определение простого шаблона КЭП

Pattern<ObservationEvent, ?> inactivityPattern = Pattern.ObservationEvent>begin("first") 
           .subtype(ObservationEvent.class) 
           .where(new FilterFunction<ObservationEvent>() { 

             @Override 
             public boolean filter(ObservationEvent arg0) throws Exception { 
               System.out.println(arg0); //This function is not at all called 
               return false; 
             } 
       }); 



PatternStream<ObservationEvent> inactivityCEP = CEP.pattern(inactivityStream.keyBy("rackId"), inactivityPattern); 

Когда я запускаю этот код, функция фильтра внутри предложения where вовсе не вызвано. Я напечатал inactivityStream.print(), и я вижу соответствующее значение.

Теперь, когда я подключаю входной поток напрямую, не применяя окно. Образец соответствует

Я напечатал inputStream и WindowedStream, и я вижу, что они оба отправляют похожие данные.

Что я упускаю

ответ

0

FilterFunction должен быть вызывался в конечном счете, но вы будете иметь, чтобы ждать 10 событий за тот же ключ, прежде чем вы видите свою FilterFunction под названием впервые. Может быть, вы просто не дожидаетесь достаточно долго в своем оконном тесте?

Имейте в виду, что если у вас много уникальных ключей, это означает, что вам придется подождать более 10 раз в окне теста, прежде чем вы увидите вызванную функцию фильтра.

+0

Зачем мне ждать?. Кроме того, макет данных имеет только один ключ. Окно распечатывается немедленно, но функция фильтра не вызывается. Извините, но я не получаю часть «Ожидание». – madhairsilence

+0

Вам нужно просто подождать, потому что не будет выхода из оконной функции для заданного ключа, пока вы не увидите 10 элементов для этого ключа. Можете ли вы опубликовать весь рабочий пример, чтобы я мог комментировать дальше? Трудно понять, что может быть причиной вашей проблемы без полного кода. –

 Смежные вопросы

  • Нет связанных вопросов^_^