2016-09-09 7 views
1

Я пытаюсь использовать flink, чтобы написать файл csv как паркет. Я использую следующий код и получаю сообщение об ошибке.Ошибка Flink конвертировать в паркет

val parquetFormat = new HadoopOutputFormat[Void, String](new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

Я получаю следующую ошибку сборки. Кто-то может помочь?

тип несоответствие; найдено: parquet.avro.AvroParquetOutputFormat требуется: org.apache.hadoop.mapreduce.OutputFormat [Void, String] ingestion.scala/flink-scala/src/main/scala/com/sc/edl/линия линии 75 Scala Задайте вопрос

ответ

1

Вы хотите создать HadoopOutputFormat[Void, String] который требует OutputFormat[Void, String].

Вы предоставляете AvroParquetOutputFormat, который распространяется на ParquetOutputFormat<IndexedRecord>. ParquetOutputFormat определяется как ParquetOutputFormat<T> extends FileOutputFormat<Void, T>.

Таким образом, вы предоставляете OutputFormat[Void, IndexedRecord], а HadoopOutputFormat[Void, String] ожидает OutputFormat[Void, String].

Вы должны изменить parquetFormat к

val parquetFormat = new HadoopOutputFormat[Void, IndexedRecord](
    new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

Если DataSet, что вы хотите, чтобы выписать не типа (Void, IndexedRecord), вы должны добавить MapFunction, который преобразует данные в (Void, IndexedRecord) пар.

+0

Спасибо, Фабиан, извините, но я новичок в этом вопросе, можете ли вы предоставить правильный синтаксис или что не так – Niki

+0

Я продлил свой ответ –

1

По-прежнему проблема остается, поскольку Flink Tuple не поддерживает NULL Keys на данный момент. После произойдет ошибка: Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value.

Лучший выбора будет использовать KiteSDK, как описан в этом примере: https://github.com/nezihyigitbasi/FlinkParquet Так что, если вам нужна динамическая схема, то этот подход не будет работать, потому что вы должны придерживаться схем строго. Более того, это лучше для чтения, а не для записи.

Spark DataFrame отлично работает с Parquet не только с точки зрения API, но и с точки зрения производительности. Но если вы хотите использовать Flink, вам нужно либо дождаться, пока сообщество flink опубликует api, либо отредактирует собственный паркет-хауоп-код, который может быть большим.

Только эти разъемы выполнены еще https://github.com/apache/flink/tree/master/flink-connectors Итак, мое личное предложение было бы, если вы можете использовать искру так пойти на это, она имеет более зрелый апи рассматривают случаи использования продукции. Поскольку вы застряли с базовой потребностью с флинком, вы можете застрять и в другом месте.

Не теряйте время, чтобы найти обойти с Флинком на данный момент, я потратил много времени на свое решающее время, предпочитая стандартные варианты, такие как Hive, Spark или MR.