2016-05-25 6 views
1

Я программирую простой пример для тестирования нового Scala API для CEP в Flink, используя последнюю версию Github для 1.1-SNAPSHOT.Простой API Scala для примера CEP не показывает никакого вывода

Шаблон - это только проверка значения, и в результате получается один String для каждого сопоставленного шаблона. Код выглядит следующим образом:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4) 

val cepEventAlert = CEP.pattern(streamingAlert, pattern) 

def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = { 
    val startEvent = pattern.get("start").get 
    "Alerta:"+startEvent._1+": Pattern" 
} 

val patternStreamSelected = cepEventAlert.select(selectFn(_)) 

patternStreamSelected.print() 

Он собирает и работает под 1.1-SNAPSHOT без проблем, но выход JobManager не показывает никаких признаков, что печать(). Даже расслабление условий шаблона и установка только «начала» (принятие всех событий) абсолютно ничего не возвращает.

Кроме того, при попытке добавить этапы код не удается скомпилировать. Если изменить шаблон на (Обнаружение двух последовательных событий с 3-го поля меньше, чем 4):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30)) 

Компилятор затем Броски:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4)) 

Отображение ошибки на первой, где() после того, как «старт». Я судимый явно установить тип параметра с помощью:

(x: (String, Long, Int)) => x._3 < 4 

Таким образом, он собирает снова, но когда он работает на Флинка, то вывод не показан. StreamingAlert - это Scala DataStream [(String, Long, Int)], и в других частях кода я могу без проблем фильтровать с _._ < 4, и результат кажется правильным.

ответ

1

Ошибка API в потоковом API не вызывает нетерпеливого исполнения. Вам еще нужно позвонить env.execute() в конце вашей программы.

Когда вы определяете свой шаблон, вы должны указать тип события где-нибудь. Либо вы делаете это, как вы сделали это или сделать это с помощью параметра типа для begin:

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30)) 
+0

'env.execute()' называется, и я могу прочитать другой вывод потоков данных, а также задание в списке как работает в веб-интерфейсе, а задание CEP показывает, что происходит получение данных, но вывод 0B. Позже в коде 'streamingAlert' обрабатывается с помощью DataStream.filter()', и все кажется правильным (как в веб-интерфейсе, так и в журнале вывода). Единственное, что не может вывести какой-либо элемент, это PatternStream. Если бы у меня отсутствовал вызов 'execute()', то я предполагаю, что никакого вывода вообще не будет. – midnight1247

+0

Вы проверили файлы '.out' менеджеров задач? Оператор печати выполняется в TaskManager, и, таким образом, вывод записывается в файл stdout, а не в файл stdout JobManager (если только вы не запустили локальный кластер). –