У меня есть тысячи небольших файлов, и я хочу обработать их с помощью combFileInputFormat.isSplitable in combFileInputFormat не работает
В combFileInputFormat имеется несколько небольших файлов для одного картографа, каждый файл не будет разделен.
сниппет одного из малых входных файлов, как это,
vers,3
period,2015-01-26-18-12-00,438469546,449329626,complete
config,libdvm.so,chromeview
pkgproc,com.futuredial.digitchat,10021,,0ns:10860078
pkgpss,com.futuredial.digitchat,10021,,0ns:9:6627:6627:6637:5912:5912:5912
pkgsvc-run,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078
pkgsvc-start,com.futuredial.digitchat,10021,.LiveScreenService,1,0n:10860078
pkgproc,com.google.android.youtube,10103,,0ns:10860078
pkgpss,com.google.android.youtube,10103,,0ns:9:12986:13000:13021:11552:11564:11580
pkgsvc- run,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078
pkgsvc- start,com.google.android.youtube,10103,com.google.android.apps.youtube.app.offline.transfer.OfflineTransferService,1,0n:10860078
Я хочу передать все содержимое файла в картографа. Однако hadoop разделил файл на половину.
Например, выше файл может быть разделен на
vers,3
period,2015-01-26-18-12-00,438469546,449329626,complete
config,libdvm.so,chromeview
pkgproc,com.futuredial.digitchat,#the line has been cut
Но я хочу, чтобы содержимое всего файла для обработки.
Вот мой код, который ссылаться Reading file as single record in hadoop
Ведомая код
public class CombineSmallfiles {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: conbinesmallfiles <in> <out>");
System.exit(2);
}
conf.setInt("mapred.min.split.size", 1);
conf.setLong("mapred.max.split.size", 26214400); // 25m
//conf.setLong("mapred.max.split.size", 134217728); // 128m
//conf.setInt("mapred.reduce.tasks", 5);
Job job = new Job(conf, "combine smallfiles");
job.setJarByClass(CombineSmallfiles.class);
job.setMapperClass(CombineSmallfileMapper.class);
//job.setReducerClass(IdentityReducer.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleOutputs.addNamedOutput(job,"pkgproc",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"pkgpss",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"pkgsvc",TextOutputFormat.class,Text.class,Text.class);
job.setInputFormatClass(CombineSmallfileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
int exitFlag = job.waitForCompletion(true) ? 0 : 1;
System.exit(exitFlag);
}
}
Мой Mapper код
public class CombineSmallfileMapper extends Mapper<NullWritable, Text, Text, Text> {
private Text file = new Text();
private MultipleOutputs mos;
private String period;
private Long elapsed;
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs(context);
}
@Override
protected void map(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
String file_name = context.getConfiguration().get("map.input.file.name");
String [] filename_tokens = file_name.split("_");
String uuid = filename_tokens[0];
String [] datetime_tokens;
try{
datetime_tokens = filename_tokens[1].split("-");
}catch(ArrayIndexOutOfBoundsException err){
throw new ArrayIndexOutOfBoundsException(file_name);
}
String year,month,day,hour,minute,sec,msec;
year = datetime_tokens[0];
month = datetime_tokens[1];
day = datetime_tokens[2];
hour = datetime_tokens[3];
minute = datetime_tokens[4];
sec = datetime_tokens[5];
msec = datetime_tokens[6];
String datetime = year+"-"+month+"-"+"-"+day+" "+hour+":"+minute+":"+sec+"."+msec;
String content = value.toString();
String []lines = content.split("\n");
for(int u = 0;u<lines.length;u++){
String line = lines[u];
String []tokens = line.split(",");
if(tokens[0].equals("period")){
period = tokens[1];
try{
long startTime = Long.valueOf(tokens[2]);
long endTime = Long.valueOf(tokens[3]);
elapsed = endTime-startTime;
}catch(NumberFormatException err){
throw new NumberFormatException(line);
}
}else if(tokens[0].equals("pkgproc")){
String proc_info = "";
try{
proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
}catch(ArrayIndexOutOfBoundsException err){
throw new ArrayIndexOutOfBoundsException("pkgproc: "+content+ "line:"+line);
}
for(int i = 4;i<tokens.length;i++){
String []state_info = tokens[i].split(":");
String state = "";
state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1];
mos.write("pkgproc",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime));
}
}else if(tokens[0].equals("pkgpss")){
String proc_info = "";
proc_info += period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
for(int i = 4;i<tokens.length;i++){
String []state_info = tokens[i].split(":");
String state = "";
state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[0].charAt(2)+","+state_info[1]+","+state_info[2]+","+state_info[3]+","+state_info[4]+","+state_info[5]+","+state_info[6]+","+state_info[7];
mos.write("pkgpss",new Text(tokens[1]), new Text(proc_info+state+','+uuid+','+datetime));
}
}else if(tokens[0].startsWith("pkgsvc")){
String []stateName = tokens[0].split("-");
String proc_info = "";
//tokens[2] = uid, tokens[3] = serviceName
proc_info += stateName[1]+','+period+","+String.valueOf(elapsed)+","+tokens[2]+","+tokens[3];
String opcount = tokens[4];
for(int i = 5;i<tokens.length;i++){
String []state_info = tokens[i].split(":");
String state = "";
state += ","+state_info[0].charAt(0)+","+state_info[0].charAt(1)+","+state_info[1];
mos.write("pkgsvc",new Text(tokens[1]), new Text(proc_info+state+','+opcount+','+uuid+','+datetime));
}
}
}
}
}
Мой CombineFileInputFormat, который отменяет isSplitable и возвращают ложные
public class CombineSmallfileInputFormat extends CombineFileInputFormat<NullWritable, Text> {
@Override
public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new CombineFileRecordReader<NullWritable,Text>((CombineFileSplit) split,context,WholeFileRecordReader.class);
}
@Override
protected boolean isSplitable(JobContext context,Path file){
return false;
}
}
WholeFileRecordReader
public class WholeFileRecordReader extends RecordReader<NullWritable, Text> {
//private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class);
/** The path to the file to read. */
private final Path mFileToRead;
/** The length of this file. */
private final long mFileLength;
/** The Configuration. */
private final Configuration mConf;
/** Whether this FileSplit has been processed. */
private boolean mProcessed;
/** Single Text to store the file name of the current file. */
// private final Text mFileName;
/** Single Text to store the value of this file (the value) when it is read. */
private final Text mFileText;
/**
* Implementation detail: This constructor is built to be called via
* reflection from within CombineFileRecordReader.
*
* @param fileSplit The CombineFileSplit that this will read from.
* @param context The context for this task.
* @param pathToProcess The path index from the CombineFileSplit to process in this record.
*/
public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context,
Integer pathToProcess) {
mProcessed = false;
mFileToRead = fileSplit.getPath(pathToProcess);
mFileLength = fileSplit.getLength(pathToProcess);
mConf = context.getConfiguration();
context.getConfiguration().set("map.input.file.name", mFileToRead.getName());
assert 0 == fileSplit.getOffset(pathToProcess);
//if (LOG.isDebugEnabled()) {
//LOG.debug("FileToRead is: " + mFileToRead.toString());
//LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths());
//try {
//FileSystem fs = FileSystem.get(mConf);
//assert fs.getFileStatus(mFileToRead).getLen() == mFileLength;
//} catch (IOException ioe) {
//// oh well, I was just testing.
//}
//}
//mFileName = new Text();
mFileText = new Text();
}
/** {@inheritDoc} */
@Override
public void close() throws IOException {
mFileText.clear();
}
/**
* Returns the absolute path to the current file.
*
* @return The absolute path to the current file.
* @throws IOException never.
* @throws InterruptedException never.
*/
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
/**
* <p>Returns the current value. If the file has been read with a call to NextKeyValue(),
* this returns the contents of the file as a BytesWritable. Otherwise, it returns an
* empty BytesWritable.</p>
*
* <p>Throws an IllegalStateException if initialize() is not called first.</p>
*
* @return A BytesWritable containing the contents of the file to read.
* @throws IOException never.
* @throws InterruptedException never.
*/
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return mFileText;
}
/**
* Returns whether the file has been processed or not. Since only one record
* will be generated for a file, progress will be 0.0 if it has not been processed,
* and 1.0 if it has.
*
* @return 0.0 if the file has not been processed. 1.0 if it has.
* @throws IOException never.
* @throws InterruptedException never.
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return (mProcessed) ? (float) 1.0 : (float) 0.0;
}
/**
* All of the internal state is already set on instantiation. This is a no-op.
*
* @param split The InputSplit to read. Unused.
* @param context The context for this task. Unused.
* @throws IOException never.
* @throws InterruptedException never.
*/
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// no-op.
}
/**
* <p>If the file has not already been read, this reads it into memory, so that a call
* to getCurrentValue() will return the entire contents of this file as Text,
* and getCurrentKey() will return the qualified path to this file as Text. Then, returns
* true. If it has already been read, then returns false without updating any internal state.</p>
*
* @return Whether the file was read or not.
* @throws IOException if there is an error reading the file.
* @throws InterruptedException if there is an error.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!mProcessed) {
if (mFileLength > (long) Integer.MAX_VALUE) {
throw new IOException("File is longer than Integer.MAX_VALUE.");
}
byte[] contents = new byte[(int) mFileLength];
FileSystem fs = mFileToRead.getFileSystem(mConf);
FSDataInputStream in = null;
try {
// Set the contents of this file.
in = fs.open(mFileToRead);
IOUtils.readFully(in, contents, 0, contents.length);
mFileText.set(contents, 0, contents.length);
} finally {
IOUtils.closeQuietly(in);
}
mProcessed = true;
return true;
}
return false;
}
}
Я хочу, чтобы каждый картограф разобрать несколько небольших файлы и каждый маленький файл не может быть разделен.
Однако выше код будет вырезать (разбивать) мой файл ввода и поднять ошибку синтаксического анализа (поскольку мой синтаксический анализатор разделит строку на токены).
В моей концепции combFileInputFormat будет собирать несколько файлов в один раскол, и каждый раскол будет подаваться в один картограф. Поэтому один обработчик может обрабатывать несколько файлов.
В моем коде максимальное разделение ввода установлено на 25 МБ, поэтому я думаю, что проблема заключается в том, что combFileInputFormat разделит последнюю часть небольшого файла входного разделения, чтобы соответствовать пределу размера разделения.
Однако у меня есть переопределение isSplitable и возврат false, но он по-прежнему разбивает маленький файл.
Каков правильный способ сделать это?
Я не уверен, можно ли указать количество файлов в mapper, а не указывать размер входного разделения?
Он по-прежнему разделяет мой маленький файл, даже если я использую setMaxSplitSize (67108864); –