2016-06-10 12 views
0

В моем коде используется readTextFile для чтения файлов журнала, и когда я запускаю банку в Flink (/opt/flink-1.0.3/bin/flink run -m yarn-cluster -yn 2 /home/flink/flink-json-0.1.jar), он успешно обрабатывает строки внутри и останавливает мое приложение, а не ждет новых строк. Нужен ли мне какой-то параметр для этого?Почему flink останавливает приложение моего потока?

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val stream = env.readTextFile("hdfs:///test/ignicion.io") 

Спасибо заранее

ответ

2

Вы ищете

StreamExecutionEnvironment.readFileStream(String filePath, long intervalMillis, WatchType watchType) 

Для WatchType у вас есть следующие варианты

  • ONLY_NEW_FILES,
  • REPROCESS_WITH_APPENDED,
  • PROCESS_ONLY_APPENDED;

Поток из

StreamExecutionEnvironment.readTextFile(String filePath, String charsetName) 

будет завершена после прочтения всех файлов. Я думаю, это в основном для локального тестирования во время разработки.

+0

Тот же результат: 'env.readFileStream (" hdfs: ///test/ignicion.io ", 100, FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED)' – jag

+0

Допустим, вы тоже остановились? – snntrable

+0

извините, он работает так, как вы писали ... – jag