Я пытаюсь написать вычисление в Flink, которое требует двух фаз.Повторное использование результатов первых вычислений во втором вычислении
На первом этапе я начинаю с текстового файла и выполняю некоторую оценку параметров, получая в результате объект Java, представляющий статистическую модель данных.
На втором этапе я хотел бы использовать этот объект для генерации данных для моделирования.
Я не уверен, как это сделать. Я пробовал с LocalCollectionOutputFormat
, и он работает локально, но когда я развертываю работу в кластере, я получаю NullPointerException
- что не удивительно.
Что такое Flink способ сделать это?
Вот мой код:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);
// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
.readCsvFile(args[0])
.types(String.class, String.class, String.class, String.class);
List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));
env.execute("Bayes");
DataSet<BTP> btp = env
.createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....
Это исключение, которое я получаю:
Error: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)
at org.apache.flink.client.program.Client.run(Client.java:328)
at org.apache.flink.client.program.Client.run(Client.java:294)
at org.apache.flink.client.program.Client.run(Client.java:288)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
at it.list.flink.test.Test01.main(Test01.java:62)
...
Спасибо , это очень полезно. Я пробовал цепочки, но результат первого запуска - это в основном набор параметров, то есть одна строка. Распространяя эту строку для клиентов, я могу генерировать данные параллельно на каждом клиенте с помощью InputFormat, в то время как с цепочкой я не мог найти способ сделать то же самое. – Flavio
Не уверен, что я правильно понял, но вы могли бы использовать следующий трюк: –
Поместите параметры в широковещательную переменную и передайте ее в набор операторов «flatMap». Каждый из этих операторов flatMap будет иметь доступ ко всем параметрам и может испускать столько записей, сколько захотите. Однако вам нужно инициировать выполнение «flatMaps» с фиктивной входной записью, которую можно было бы подавать из формата ввода коллекции, содержащего «Integer» для каждого параллельного экземпляра «flatMap». Данные сборника IF должны быть «перебалансированы» перед входом в операторы «flatMap». –