Это может быть довольно поздно, чтобы ответить, но я также столкнулся с этим вопросом и пришел к решению.
Во-первых, нет поддержки как «MultipleAvroParquetOutputFormat
» встроенный в parquet-mr
. Но для достижения подобного поведения я использовал MultipleOutputs
.
Для отображения только вида работы, поставить картограф так:
public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{
protected KafkaAvroDecoder deserializer;
protected String outputPath = "";
// Using MultipleOutputs to write custom named files
protected MultipleOutputs<Void, GenericRecord> mos;
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
outputPath = conf.get(FileOutputFormat.OUTDIR);
mos = new MultipleOutputs<Void, GenericRecord>(context);
}
public void map(LongWritable ln, BytesWritable value, Context context){
try {
GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
Schema schema = record.getSchema();
String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm
mos.write((Void) null, record, mergeEventsPath);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
Это создаст новый RecordWriter
для каждой схемы и создает новый паркет файл, прилагаемый с именем схемы, например, , schema1-r-0000.parquet.
Это также создаст файлы part-r-0000x.parquet по умолчанию на основе схемы, установленной в драйвере. Чтобы избежать этого, используйте LazyOutputFormat
как:
LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);
Надеется, что это помогает.