Я программирую простой пример для тестирования нового Scala API для CEP в Flink, используя последнюю версию Github для 1.1-SNAPSHOT.Простой API Scala для примера CEP не показывает никакого вывода
Шаблон - это только проверка значения, и в результате получается один String для каждого сопоставленного шаблона. Код выглядит следующим образом:
val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()
Он собирает и работает под 1.1-SNAPSHOT без проблем, но выход JobManager не показывает никаких признаков, что печать(). Даже расслабление условий шаблона и установка только «начала» (принятие всех событий) абсолютно ничего не возвращает.
Кроме того, при попытке добавить этапы код не удается скомпилировать. Если изменить шаблон на (Обнаружение двух последовательных событий с 3-го поля меньше, чем 4):
Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
Компилятор затем Броски:
error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))
Отображение ошибки на первой, где() после того, как «старт». Я судимый явно установить тип параметра с помощью:
(x: (String, Long, Int)) => x._3 < 4
Таким образом, он собирает снова, но когда он работает на Флинка, то вывод не показан. StreamingAlert - это Scala DataStream [(String, Long, Int)], и в других частях кода я могу без проблем фильтровать с _._ < 4
, и результат кажется правильным.
'env.execute()' называется, и я могу прочитать другой вывод потоков данных, а также задание в списке как работает в веб-интерфейсе, а задание CEP показывает, что происходит получение данных, но вывод 0B. Позже в коде 'streamingAlert' обрабатывается с помощью DataStream.filter()', и все кажется правильным (как в веб-интерфейсе, так и в журнале вывода). Единственное, что не может вывести какой-либо элемент, это PatternStream. Если бы у меня отсутствовал вызов 'execute()', то я предполагаю, что никакого вывода вообще не будет. – midnight1247
Вы проверили файлы '.out' менеджеров задач? Оператор печати выполняется в TaskManager, и, таким образом, вывод записывается в файл stdout, а не в файл stdout JobManager (если только вы не запустили локальный кластер). –