2017-01-18 11 views
0

У меня есть некоторые устаревшие данные в S3, которые я хочу преобразовать в формат паркета, используя Spark 2, используя Java API.Преобразование данных в паркет в Spark

У меня есть желаемая схема Avro (файлы .avsc) и их сгенерированные классы Java с использованием компилятора Avro, и я хочу сохранить данные, используя эту схему в формате Паркет. Входные данные не в стандартном формате, но у меня есть библиотека, которая может преобразовывать каждую строку из старых файлов в классы Avro.

Можно ли считать данные как JavaRDD<String>, применить преобразование к классам Avro с помощью библиотеки и, наконец, сохранить его в паркетном формате.

Что-то вроде:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");  
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));  
converted.saveAsParquet("s3://bucket/destination"); //how do I do this 

Это что-то вроде выше возможно? Позже я захочу обработать преобразованные данные паркета, используя Hive, Presto, а также Spark.

+0

Поиск для вр Summit Спарка. Стив Лофран (Horton) о «объектных магазинах» ... –

+0

@SamsonScharfrichter Не отвечает на мой вопрос. Единственное, что я видел, это то, как он преобразовал некоторые данные csv в Parquet. Он использует вызов sparkSession.csv() для загрузки данных, которые я не могу, поскольку мне нужно использовать собственный десериализатор. –

+0

Итак, в чем ваш ** актуальный вопрос? Речь идет о преобразовании пользовательского «JavaRDD » в обычный DataFrame? О сохранении ваших пользовательских материалов в формате Паркета? О сохранении этого в хранилище объектов S3? О способе чтения ваших пользовательских материалов с помощью другого инструмента, который не знает, что такое RDD? Комбинация вышеперечисленного? –

ответ

1

Игнорировать S3 на данный момент; это детализация производства. Вам нужно начать с более простой проблемы «преобразовать локальный файл в моем формате в стандартный». Это то, что вы можете реализовать локально с модульными тестами против небольшого набора образцов данных.

Это, как правило, то же самое в Спарк как Hadoop MapReduce: реализовать подкласс InputFormat<K, V> или FileInputFormat<K, V>, или использовать формат org.apache.hadoop.streaming.mapreduce.StreamInputFormat ввода Hadoop, в реализации собственного RecordReader, затем установите параметр spark.hadoop.stream.recordreader.class на имя класса вашей звукозаписывающей читателя (вероятно, простейший).

Существует много документации по этому вопросу, а также вопросы переполнения стека. И множество примеров в самих деревьях-источниках.

0

Фигурного это, в основном подход, упомянутый Стив, за исключением того, что входной Hadoop и выходные форматы уже существуют:

  Job job = new Job(); 
     ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class); 
     AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$); 
     AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024); 
     AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY); 
     AvroParquetOutputFormat.setCompressOutput(job, true); 

     sparkContext.textFile("s3://bucket/path_to_legacy_files") 
      .map(line -> customLib.convertToAvro(line)) 
      .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record)) 
      .saveAsNewAPIHadoopFile(
       "s3://bucket/destination", 
       Void.class, 
       MyAvroType.class, 
       new ParquetOutputFormat<MyAvroType>().getClass(), 
       job.getConfiguration());