0

Искры генерируется несколько небольших паркет файлов. Как можно обрабатывать эффективно небольшое количество паркетных файлов как на рабочих, так и на рабочих местах Spark.Как эффективно читать несколько небольших паркетных файлов с помощью Spark? есть ли CombineParquetInputFormat?

ответ

1
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper; 
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit; 
import parquet.avro.AvroReadSupport; 
import parquet.hadoop.ParquetInputFormat; 

import java.io.IOException; 

public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> { 


    @Override 
    public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext 
      context) throws IOException { 
     CombineFileSplit combineSplit = (CombineFileSplit) split; 
     return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class); 
    } 

    private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> { 


     public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws 
       IOException, InterruptedException { 
      super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx); 
     } 
    } 
} 

На стороне потребителя, пожалуйста, воспользуйтесь CombinedParquetInputFile: который заставит несколько небольших файлов для чтения из одной задачи.

На стороне производителя: Пользователь объединяется (numFiles), чтобы иметь достаточное количество файлов в качестве вывода.

Как использовать customInputFileFormat в искру и форме RDD и Dataframes:

 JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration()) 
              .values() 
              .map(p -> { 
               Row row = RowFactory.create(avroPojoToObjectArray((p)); 
               return row; 
              }); 


    sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true); 


//set max split size else only 1 task wil be spawned  
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024)); 


    StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType(); 
      final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema); 

Пожалуйста, обратитесь к http://bytepadding.com/big-data/spark/combineparquetfileinputformat/ для глубокого понимания

1

Самый простой подход ИМХО использовать Repartition/COALESCE (предпочитают объединять, если данные не искажены, и вы хотите создавать выходы одинакового размера), прежде чем писать паркетные файлы, чтобы не создавать небольшие файлы для начала.

df 
    .map(<some transformation>) 
    .filter(<some filter>) 
    ///... 
    .coalesce(<number of partitions>) 
    .write 
    .parquet(<path>) 

Количество перегородок может быть рассчитана на кол полных строк в dataframe разделенных некоторым фактором, путем проб и ошибок даст вам правильный размер.

Это лучшая практика в большинстве больших структур данных, preffer несколько больших файлов для множества небольших файлов (размер файла обычно я использую 100-500MB)

Если у вас уже есть данные в небольших файлах, и вы хотите чтобы объединить его, насколько мне известно, вам придется прочитать его с переделкой Spark на меньшее количество разделов и записать его снова.

+0

Пожалуйста, добавьте groupBy перед слиянием и, пожалуйста, соблюдайте дефрагментацию заданий. И это просто контроль со стороны производителя над небольшими файлами. Потребителю все равно придется читать много мелких паркетных файлов, так как в большинстве случаев производитель может не находиться под вашим контролем, и вы не захотите объединять данные, если их нужно только читать – KrazyGautam

+0

@KrazyGautam, почему groupBy? что, если вы не хотите агрегировать? На другой теме я только что видел ваше решение для потребителя. Выглядит интересно, я не знал об этом ... Кстати, вы можете объединиться, чтобы уменьшить номер задачи, даже если вы только читаете один раз .. имея много задач может иметь значительные накладные расходы –

 Смежные вопросы

  • Нет связанных вопросов^_^