Искры генерируется несколько небольших паркет файлов. Как можно обрабатывать эффективно небольшое количество паркетных файлов как на рабочих, так и на рабочих местах Spark.Как эффективно читать несколько небольших паркетных файлов с помощью Spark? есть ли CombineParquetInputFormat?
ответ
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReaderWrapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
import java.io.IOException;
public class CombineParquetInputFormat<T> extends CombineFileInputFormat<Void, T> {
@Override
public RecordReader<Void, T> createRecordReader(InputSplit split, TaskAttemptContext
context) throws IOException {
CombineFileSplit combineSplit = (CombineFileSplit) split;
return new CombineFileRecordReader(combineSplit, context, CombineParquetrecordReader.class);
}
private static class CombineParquetrecordReader<T> extends CombineFileRecordReaderWrapper<Void, T> {
public CombineParquetrecordReader(CombineFileSplit split, TaskAttemptContext context, Integer idx) throws
IOException, InterruptedException {
super(new ParquetInputFormat<T>(AvroReadSupport.class), split, context, idx);
}
}
}
На стороне потребителя, пожалуйста, воспользуйтесь CombinedParquetInputFile: который заставит несколько небольших файлов для чтения из одной задачи.
На стороне производителя: Пользователь объединяется (numFiles), чтобы иметь достаточное количество файлов в качестве вывода.
Как использовать customInputFileFormat в искру и форме RDD и Dataframes:
JavaRDD<Row> javaRDD = sc.newAPIHadoopFile(hdfsInputPath, CombineParquetInputFormat.class, Void.class, "AvroPojo.class", sc.hadoopConfiguration())
.values()
.map(p -> {
Row row = RowFactory.create(avroPojoToObjectArray((p));
return row;
});
sc.hadoopConfiguration().setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,true);
//set max split size else only 1 task wil be spawned
sc.hadoopConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", (long) (128 * 1024 * 1024));
StructType outputSchema = (StructType) SchemaConverters.toSqlType(Profile.getClassSchema()).dataType();
final DataFrame requiredDataFrame = sqlContext.createDataFrame(javaRDD, outputSchema);
Пожалуйста, обратитесь к http://bytepadding.com/big-data/spark/combineparquetfileinputformat/ для глубокого понимания
Самый простой подход ИМХО использовать Repartition/COALESCE (предпочитают объединять, если данные не искажены, и вы хотите создавать выходы одинакового размера), прежде чем писать паркетные файлы, чтобы не создавать небольшие файлы для начала.
df
.map(<some transformation>)
.filter(<some filter>)
///...
.coalesce(<number of partitions>)
.write
.parquet(<path>)
Количество перегородок может быть рассчитана на кол полных строк в dataframe разделенных некоторым фактором, путем проб и ошибок даст вам правильный размер.
Это лучшая практика в большинстве больших структур данных, preffer несколько больших файлов для множества небольших файлов (размер файла обычно я использую 100-500MB)
Если у вас уже есть данные в небольших файлах, и вы хотите чтобы объединить его, насколько мне известно, вам придется прочитать его с переделкой Spark на меньшее количество разделов и записать его снова.
Пожалуйста, добавьте groupBy перед слиянием и, пожалуйста, соблюдайте дефрагментацию заданий. И это просто контроль со стороны производителя над небольшими файлами. Потребителю все равно придется читать много мелких паркетных файлов, так как в большинстве случаев производитель может не находиться под вашим контролем, и вы не захотите объединять данные, если их нужно только читать – KrazyGautam
@KrazyGautam, почему groupBy? что, если вы не хотите агрегировать? На другой теме я только что видел ваше решение для потребителя. Выглядит интересно, я не знал об этом ... Кстати, вы можете объединиться, чтобы уменьшить номер задачи, даже если вы только читаете один раз .. имея много задач может иметь значительные накладные расходы –