2016-11-03 2 views
1

Я пытаюсь использовать Time_Ant10s (нестандартный класс ArrayWritable) в качестве вывода Reducer.NullPointerException в toString() метод класса CustomArrayWritable, MapReduce

Я рассматриваю этот хороший вопрос: MapReduce Output ArrayWritable, но я получаю NullPointerException в context.write() в последней строке редуктора.

Я полагаю, что get() в Time_Ant10s.toString() может вернуть null, но я понятия не имею, почему это происходит. Не могли бы вы помочь мне?

Главная Метод

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Job job = Job.getInstance(conf, "something"); 

    // general 
    job.setJarByClass(CommutingTime1.class); 
    job.setMapperClass(Mapper1.class); 
    job.setReducerClass(Reducer1.class); 
    job.setNumReduceTasks(1); 
    job.setInputFormatClass (TextInputFormat.class); 

    // mapper output 
    job.setMapOutputKeyClass(Date_Uid.class); 
    job.setMapOutputValueClass(Time_Ant10.class); 

    // reducer output 
    job.setOutputFormatClass(CommaTextOutputFormat.class); 
    job.setOutputKeyClass(IntWritable.class); 
    job.setOutputValueClass(Time_Ant10s.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 

Mapper

public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> { 
    /* map as <date_uid, time_ant10> */ 
    // omitted 
    } 
} 

Reducer

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> { 
    /* <date_uid, time_ant10> -> <date, time_ant10s> */ 

    private IntWritable date = new IntWritable(); 

    @Override 
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException { 

     date.set(date_uid.getDate()); 

     // count ants 
     int num = 0; 
     for(Time_Ant10 time_ant10 : time_ant10s){ 
      num++; 
     } 

     if(num>=1){ 
      Time_Ant10[] temp = new Time_Ant10[num]; 

      int i=0; 
      for(Time_Ant10 time_ant10 : time_ant10s){ 
       String time = time_ant10.getTimeStr(); 
       int ant10 = time_ant10.getAnt10(); 
       temp[i] = new Time_Ant10(time, ant10); 
       i++; 
      } 

      context.write(date, new Time_Ant10s(temp)); 
     } 
    } 
} 

Писатель

public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> { 
    @Override 
    public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { 
     Configuration conf = job.getConfiguration(); 
     String extension = ".txt"; 
     Path file = getDefaultWorkFile(job, extension); 
     FileSystem fs = file.getFileSystem(conf); 
     FSDataOutputStream fileOut = fs.create(file, false); 
     return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ","); 
    } 
} 

Пользовательские Writables

// Time 
public static class Time implements Writable { 
    private int h, m, s; 

    public Time() {} 

    public Time(int h, int m, int s) { 
     this.h = h; 
     this.m = m; 
     this.s = s; 
    } 

    public Time(String time) { 
     String[] hms = time.split(":", 0); 
     this.h = Integer.parseInt(hms[0]); 
     this.m = Integer.parseInt(hms[1]); 
     this.s = Integer.parseInt(hms[2]); 
    } 

    public void set(int h, int m, int s) { 
     this.h = h; 
     this.m = m; 
     this.s = s; 
    } 

    public void set(String time) { 
     String[] hms = time.split(":", 0); 
     this.h = Integer.parseInt(hms[0]); 
     this.m = Integer.parseInt(hms[1]); 
     this.s = Integer.parseInt(hms[2]); 
    } 

    public int[] getTime() { 
     int[] time = new int[3]; 
     time[0] = this.h; 
     time[1] = this.m; 
     time[2] = this.s; 
     return time; 
    } 

    public String getTimeStr() { 
     return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s); 
    } 

    public int getTimeInt() { 
     return this.h * 10000 + this.m * 100 + this.s; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     h = in.readInt(); 
     m = in.readInt(); 
     s = in.readInt(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     out.writeInt(h); 
     out.writeInt(m); 
     out.writeInt(s); 
    } 
} 

// Time_Ant10 
public static class Time_Ant10 implements Writable { 
    private Time time; 
    private int ant10; 

    public Time_Ant10() { 
     this.time = new Time(); 
    } 

    public Time_Ant10(Time time, int ant10) { 
     this.time = time; 
     this.ant10 = ant10; 
    } 

    public Time_Ant10(String time, int ant10) { 
     this.time = new Time(time); 
     this.ant10 = ant10; 
    } 

    public void set(Time time, int ant10) { 
     this.time = time; 
     this.ant10 = ant10; 
    } 

    public void set(String time, int ant10) { 
     this.time = new Time(time); 
     this.ant10 = ant10; 
    } 

    public int[] getTime() { 
     return this.time.getTime(); 
    } 

    public String getTimeStr() { 
     return this.time.getTimeStr(); 
    } 

    public int getTimeInt() { 
     return this.time.getTimeInt(); 
    } 

    public int getAnt10() { 
     return this.ant10; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     time.readFields(in); 
     ant10 = in.readInt(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     time.write(out); 
     out.writeInt(ant10); 
    } 
} 

// Time_Ant10s 
public static class Time_Ant10s extends ArrayWritable { 
    public Time_Ant10s(){ 
     super(Time_Ant10.class); 
    } 

    public Time_Ant10s(Time_Ant10[] time_ant10s){ 
     super(Time_Ant10.class, time_ant10s); 
    } 

    @Override 
    public Time_Ant10[] get() { 
     return (Time_Ant10[]) super.get(); 
    } 

    @Override 
    public String toString() { 
     int time, ant10; 
     Time_Ant10[] time_ant10s = get(); 
     String output = ""; 

     for(Time_Ant10 time_ant10: time_ant10s){ 
      time = time_ant10.getTimeInt(); 
      ant10 = time_ant10.getAnt10(); 
      output += time + "," + ant10 + ","; 
     } 

     return output; 
    } 
} 

// Data_Uid 
public static class Date_Uid implements WritableComparable<Date_Uid> { 
    // omitted 
} 

Сообщение об ошибке

java.lang.Exception: java.lang.NullPointerException 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529) 
Caused by: java.lang.NullPointerException 
    at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85) 
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104) 
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105) 
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323) 
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291) 
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

проверить, есть ли 'null' элементы в' time_ant10s' массива .. если есть, проверьте, если вы можете избежать их. если не пропустить их в 'for loop' .. –

+1

Я проверил, есть ли значения в temp в нулях перед' context.write() 'в методе уменьшения. Как вы говорите, в temp есть несколько значений. Но в исходной date_ant10s нет значения null. Так что-то может быть неправильно в 'reduce' ... – myuuuuun

+0

Я нашел, что' temp' состоит из всех нулей ... – myuuuuun

ответ

0

Я обнаружил, что проблема Iterable в reduce не может повторяться дважды. Поэтому я ссылаюсь на this page и меняю редуктор и Time_Ant10s, как показано ниже. Теперь все идет хорошо.

@ redflar3: Большое спасибо за подсказку. Я совершенно неправильно понял, где мой код имеет ошибку.

Reducer

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> { 
    private IntWritable date = new IntWritable(); 

    @Override 
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException { 
     String time = ""; 
     int ant10; 

     date.set(date_uid.getDate()); 

     ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>(); 
     for (Time_Ant10 time_ant10 : time_ant10s){ 
      time = time_ant10.getTimeStr(); 
      ant10 = time_ant10.getAnt10(); 
      temp_list.add(new Time_Ant10(time, ant10)); 
     } 

     if(temp_list.size() >= 1){ 
      Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]); 
      context.write(date, new Time_Ant10s(temp_array)); 
     } 
    } 
} 

Time_Ant10s

public static class Time_Ant10s extends ArrayWritable { 
    public Time_Ant10s(){ 
     super(Time_Ant10.class); 
    } 

    public Time_Ant10s(Time_Ant10[] time_ant10s){ 
     super(Time_Ant10.class, time_ant10s); 
    } 

    @Override 
    public Time_Ant10[] get() { 
     return (Time_Ant10[]) super.get(); 
    } 

    @Override 
    public String toString() { 
     int time, ant10; 
     Time_Ant10[] time_ant10s = get(); 
     String output = ""; 

     for(Time_Ant10 time_ant10: time_ant10s){ 
      time = time_ant10.getTimeInt(); 
      ant10 = time_ant10.getAnt10(); 
      output += time + "," + ant10 + ","; 
     } 

     return output; 
    } 
}