2014-02-18 1 views
1

Я внедрил пользовательский формат ввода файлов для создания разделов для задачи Map, состоящей из группы файлов. Я создал решение, которое передает каждый файл сплита через устройство чтения с записью, и все в порядке. Теперь я пытаюсь передать в функцию карты весь набор файлов.Hadoop - несколько файлов с устройства чтения записей на карту

Это мой рекорд код читателя:

public class MultiImagesRecordReader extends 
     RecordReader<Text[], BytesWritable[]> { 
private long start = 0; 
private long end = 0; 
private int pos = 0; 
private BytesWritable[] value; 
private Text key[]; 
private CombineFileSplit split; 
private Configuration conf; 
private FileSystem fs; 
private static boolean recordsRead; 

public MultiImagesRecordReader(CombineFileSplit split, 
     TaskAttemptContext context, Integer index) throws IOException { 
    this.split = split; 
    this.conf = context.getConfiguration(); 
} 

@Override 
public void initialize(InputSplit genericSplit, TaskAttemptContext context) 
     throws IOException, InterruptedException { 
    start = split.getOffset(0); 
    end = start + split.getLength(); 
    recordsRead = false; 
    this.pos = (int) start; 
    fs = FileSystem.get(conf); 
    value = new BytesWritable[split.getNumPaths()]; 
    key = new Text[split.getNumPaths()]; 
} 

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    if (recordsRead == true) { 
     System.out.println("Sono nel next true"+InetAddress.getLocalHost()); 
     return false; 
    } else { 
     recordsRead = true; 
     System.out.println("Sono nel next false"+InetAddress.getLocalHost()); 
     for (int i = 0; i < split.getNumPaths(); i++) { 

      int fileLength = (int) split.getLength(i); 
      Path path = split.getPath(i); 
      byte[] result = new byte[fileLength]; 

      FSDataInputStream in = null; 

      String file_path = path.toString(); 
      key[i] = new Text(file_path); 
      try { 
       in = fs.open(path); 
       IOUtils.readFully(in, result, 0, fileLength); 

      } finally { 
       IOUtils.closeStream(in); 
      } 

      value[i] = new BytesWritable(result); 
     } 
     return true; 
    } 
} 

С помощью этого кода это случается, что функция карты получает правильно вектор ключей и значений, но неоднократно. Я имею в виду, я ожидал, что функция карты была вызвана один раз, вместо этого она называется несколько раз. Что я делаю не так?

ответ

0

Я думаю, вы знаете, что map() из Mapper будет вызываться для каждой записи, что ваш читатель возвращения из currentKey(), currentValue() до всех пар ключ-значение в данном Split закончили. Я понимаю, что ваша функция отображения неоднократно вызывается для той же пары значений ключа (которая должна быть вызвана один раз для пары с одним ключом). Это означает, что ваш считыватель записей читает одну и ту же запись (пару значений ключа) несколько раз. Я также внедрил пользовательские комбинированные форматы файлов и записи. Вы можете увидеть их общие формы here и реализации here в этом же проекте