2015-04-30 7 views
3

Я пытаюсь написать вычисление в Flink, которое требует двух фаз.Повторное использование результатов первых вычислений во втором вычислении

На первом этапе я начинаю с текстового файла и выполняю некоторую оценку параметров, получая в результате объект Java, представляющий статистическую модель данных.

На втором этапе я хотел бы использовать этот объект для генерации данных для моделирования.

Я не уверен, как это сделать. Я пробовал с LocalCollectionOutputFormat, и он работает локально, но когда я развертываю работу в кластере, я получаю NullPointerException - что не удивительно.

Что такое Flink способ сделать это?

Вот мой код:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
GlobalConfiguration.includeConfiguration(configuration); 

// Phase 1: read file and estimate model 
DataSource<Tuple4<String, String, String, String>> source = env 
     .readCsvFile(args[0]) 
     .types(String.class, String.class, String.class, String.class); 

List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>(); 
// Processing here... 
....output(new LocalCollectionOutputFormat<>(bayesResult)); 

env.execute("Bayes"); 

DataSet<BTP> btp = env 
     .createInput(new BayesInputFormat(bayesResult.get(0))) 
// Phase 2: BayesInputFormat generates data for further calculations 
// .... 

Это исключение, которое я получаю:

Error: The program execution failed: java.lang.NullPointerException 
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86) 
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176) 
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) 
    at java.lang.Thread.run(Thread.java:745) 

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException 
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86) 
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176) 
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257) 
    at java.lang.Thread.run(Thread.java:745) 

    at org.apache.flink.client.program.Client.run(Client.java:328) 
    at org.apache.flink.client.program.Client.run(Client.java:294) 
    at org.apache.flink.client.program.Client.run(Client.java:288) 
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55) 
    at it.list.flink.test.Test01.main(Test01.java:62) 
    ... 

ответ

3

С помощью последней версии (0,9-веха-1) метод collect() был добавлен в Флинка

public List<T> collect() 

, который извлекает DataSet<T> в качестве List<T> в программу водителя. collect() также инициирует немедленное выполнение программы (не нужно звонить ExecutionEnvironment.execute()). В настоящее время существует ограничение по размеру для наборов данных около 10 МБ.

Если вы не оцениваете модели в программе драйвера, вы также можете объединить обе программы вместе и испустить модель сбоку, подключив приемник данных. Это будет более эффективным, поскольку данные не будут совершать обратный путь по клиентской машине.

+0

Спасибо , это очень полезно. Я пробовал цепочки, но результат первого запуска - это в основном набор параметров, то есть одна строка. Распространяя эту строку для клиентов, я могу генерировать данные параллельно на каждом клиенте с помощью InputFormat, в то время как с цепочкой я не мог найти способ сделать то же самое. – Flavio

+0

Не уверен, что я правильно понял, но вы могли бы использовать следующий трюк: –

+0

Поместите параметры в широковещательную переменную и передайте ее в набор операторов «flatMap». Каждый из этих операторов flatMap будет иметь доступ ко всем параметрам и может испускать столько записей, сколько захотите. Однако вам нужно инициировать выполнение «flatMaps» с фиктивной входной записью, которую можно было бы подавать из формата ввода коллекции, содержащего «Integer» для каждого параллельного экземпляра «flatMap». Данные сборника IF должны быть «перебалансированы» перед входом в операторы «flatMap». –

0

Если вы используете Flink до 0,9 вы можете использовать следующий фрагмент кода, чтобы собрать свой набор данных в локальной коллекции:

val dataJavaList = new ArrayList[K] 
val outputFormat = new LocalCollectionOutputFormat[K](dataJavaList) 
dataset.output(outputFormat) 
env.execute("collect()") 

Где K это тип объекта, который вы хотите собрать

+0

Я не знаю Scala, но мне кажется, что то, что вы предлагаете, равно тому, что я сообщал в моем вопросе, и с таким подходом я получаю NPE. – Flavio