2016-06-10 1 views
1

Я пишу пакетную работу по сокращению карты, которая состоит из 3-4 целых заданий. Во втором задании я использую собственный класс как класс выходных значений при записи в контекст через context.write(). При изучении поведения кода я заметил, что метод этого настраиваемого класса вызывается, а не write. Почему это происходит, если класс реализует интерфейс Writable, и я применил метод write?При написании контекста в методе Reducer.reduce, почему метод toString вызывается, а не метод write?

код пользовательского класса в:

import org.apache.hadoop.io.Writable; 
import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 


public class WritableLongPair implements Writable { 

private long l1; 
private long l2; 

public WritableLongPair() { 
    l1 = 0; 
    l2 = 0; 
} 

public WritableLongPair(long l1, long l2) { 
    this.l1 = l1; 
    this.l2 = l2; 
} 

@Override 
public void write(DataOutput dataOutput) throws IOException { 
    dataOutput.writeLong(l1); 
    dataOutput.writeLong(l2); 
} 

@Override 
public void readFields(DataInput dataInput) throws IOException { 
    l1 = dataInput.readLong(); 
    l2 = dataInput.readLong(); 
} 

@Override 
public String toString() { 
    return l1 + " " + l2; 
} 
} 

код второй работы по:

import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Partitioner; 
import org.apache.hadoop.mapreduce.Reducer; 

import java.io.IOException; 

public class Phase2 { 

private static final int ASCII_OFFSET = 97; 

public static class Mapper2 
     extends Mapper<Object, Text, Text, LongWritable>{ 

    @Override 
    public void map(Object key, Text value, Context context 
    ) throws IOException, InterruptedException { 
     String[] valueAsStrings = value.toString().split("\t"); 
     String actualKey = valueAsStrings[0]; 
     LongWritable actualValue = new LongWritable(Long.parseLong(valueAsStrings[1])); 
     String[] components = actualKey.toString().split("[$]"); 
     if (!components[1].equals("*")) { 
      context.write(new Text(components[1] + "$" + components[0]), actualValue); 
      context.write(new Text(components[1] + "$*"), actualValue); 
     } 
     context.write(new Text(actualKey), actualValue); 
    } 
} 

public static class Partitioner2 extends Partitioner<Text, LongWritable> { 

    @Override 
    public int getPartition(Text text, LongWritable longWritable, int i) { 
     return (int)(text.toString().charAt(0)) - ASCII_OFFSET; 
    } 
} 

public static class Reducer2 
     extends Reducer<Text, LongWritable, Text, WritableLongPair> { 

     private Text currentKey; 
     private long sum; 

    @Override 
    public void setup(Context context) { 
     currentKey = new Text(); 
     currentKey.set(""); 
     sum = 0l; 
    } 

    private String textContent(String w1, String w2) { 
     if (w2.equals("*")) 
      return w1 + "$*"; 
     if (w1.compareTo(w2) < 0) 
      return w1 + "$" + w2; 
     else 
      return w2 + "$" + w1; 
    } 

    public void reduce(Text key, Iterable<LongWritable> counts, 
         Context context 
    ) throws IOException, InterruptedException { 
     long sumPair = 0l; 
     String[] components = key.toString().split("[$]"); 
     for (LongWritable count : counts) { 
      if (currentKey.equals(components[0])) { 
       if (components[1].equals("*")) 
        sum += count.get(); 
       else 
        sumPair += count.get(); 
      } 
      else { 
       sum = count.get(); 
       currentKey.set(components[0]); 
      } 
     } 
     if (!components[1].equals("*")) 
      context.write(new Text(textContent(components[0], components[1])), new WritableLongPair(sumPair, sum)); 
    } 
} 

public static class Comparator2 extends WritableComparator { 

    @Override 
    public int compare(WritableComparable o1, WritableComparable o2) { 
     String[] components1 = o1.toString().split("[$]"); 
     String[] components2 = o2.toString().split("[$]"); 
     if (components1[1].equals("*") && components2[1].equals("*")) 
      return components1[0].compareTo(components2[0]); 
     if (components1[1].equals("*")) { 
      if (components1[0].equals(components2[0])) 
       return -1; 
      else 
       return components1[0].compareTo(components2[0]); 
     } 
     if (components2[1].equals("*")) { 
      if (components1[0].equals(components2[0])) 
       return 1; 
      else 
       return components1[0].compareTo(components2[0]); 
     } 
     return components1[0].compareTo(components2[0]); 
    } 

} 

} 

... и как я определяю мои работы:

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Counter; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class Manager { 

public static void main(String[] args) throws Exception { 
    Configuration conf1 = new Configuration(); 
    if (args.length != 2) { 
     System.err.println("Usage: Manager <in> <out>"); 
     System.exit(1); 
    } 
    Job job1 = Job.getInstance(conf1, "Phase 1"); 
    job1.setJarByClass(Phase1.class); 
    job1.setMapperClass(Phase1.Mapper1.class); 
    job1.setPartitionerClass(Phase1.Partitioner1.class); 
//  job1.setCombinerClass(Phase1.Combiner1.class); 
    job1.setReducerClass(Phase1.Reducer1.class); 
    job1.setInputFormatClass(SequenceFileInputFormat.class); 
//  job1.setOutputFormatClass(FileOutputFormat.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(LongWritable.class); 
    job1.setNumReduceTasks(12); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    Path output1 = new Path(args[1]); 
    FileOutputFormat.setOutputPath(job1, output1); 
    boolean result = job1.waitForCompletion(true); 
    Counter counter = job1.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS"); 
    System.out.println("Num of pairs sent to reducers in phase 1: " + counter.getValue()); 

    Configuration conf2 = new Configuration(); 
    Job job2 = Job.getInstance(conf2, "Phase 2"); 
    job2.setJarByClass(Phase2.class); 
    job2.setMapperClass(Phase2.Mapper2.class); 
    job2.setPartitionerClass(Phase2.Partitioner2.class); 
//  job2.setCombinerClass(Phase2.Combiner2.class); 
    job2.setReducerClass(Phase2.Reducer2.class); 
    job2.setMapOutputKeyClass(Text.class); 
    job2.setMapOutputValueClass(LongWritable.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(WritableLongPair.class); 
    job2.setNumReduceTasks(26); 
//  job2.setGroupingComparatorClass(Phase2.Comparator2.class); 
    FileInputFormat.addInputPath(job2, output1); 
    Path output2 = new Path(args[1] + "2"); 
    FileOutputFormat.setOutputPath(job2, output2); 
    result = job2.waitForCompletion(true); 
    counter = job2.getCounters().findCounter("org.apache.hadoop.mapreduce.TaskCounter", "REDUCE_INPUT_RECORDS"); 
    System.out.println("Num of pairs sent to reducers in phase 2: " + counter.getValue()); 


//  System.exit(job1.waitForCompletion(true) ? 0 : 1); 

} 
} 
+0

Выполняет ли задание фактически вывод данных на диск из редуктора? –

+0

Да - выдает его в форме: ' \ t '. – asafc

ответ

1

Если вы используете по умолчанию outputformatter (TextOutputFormat) Hadoop будет вызывать метод toString() объекта, когда он записывает его на него к. Это ожидаемое поведение. Вызывается context.write(), но его выходной формат контролирует, как данные появляются на диске.

Если вы связываете рабочие места вместе, вы обычно используете SequenceFileInputFormat и SequenceFileOutputFormat для всех заданий, так как это облегчает чтение результатов с одного задания на последующую работу.

+0

При использовании 'SequenceFileOutputFormat' будет ли текст, записанный в выходной файл, результатом выполнения сериализации в экземпляре класса, а наоборот - чтение с помощью' SequenceFileInputFormat' выполняет десериализацию текста? – asafc

+0

'SequenceFileOutputFormat' - это двоичный формат на диске, в котором используются методы' read' и 'write'. Те же методы будут использоваться 'SequenceFileInputFormat' для десериализации объекта. Итак, все, что вам нужно, правильно ввести типы ввода/значения в Mapper. –