2015-02-28 2 views
0

Я пишу MR-задание для ведения журналов веб-сервера. Вход для задания - из текстовых файлов, вывод - в базу данных MySQL. Проблема в том, что работа завершена успешно, но ничего не записывает в БД. Я некоторое время не программировал MR, поэтому, скорее всего, это ошибка, которую я не могу найти. Это не соответствие шаблонов (см. Ниже), что я тестировал блок и отлично работает. Что мне не хватает? Mac OS X, Oracle JDK 1.8.0_31, hadoop-2.6.0 Примечание: Исключения регистрируются, я опускал их для краткости.Hadoop MapReduce завершает работу успешно, но ничего не пишет в DB

SkippableLogRecord:

public class SkippableLogRecord implements WritableComparable<SkippableLogRecord> { 
    // fields 

    public SkippableLogRecord(Text line) { 
     readLine(line.toString()); 
    } 
    private void readLine(String line) { 
     Matcher m = PATTERN.matcher(line); 

     boolean isMatchFound = m.matches() && m.groupCount() >= 5; 

     if (isMatchFound) { 
     try { 
      jvm = new Text(m.group("jvm")); 

      Calendar cal = getInstance(); 
      cal.setTime(new SimpleDateFormat(DATE_FORMAT).parse(m 
      .group("date"))); 

      day = new IntWritable(cal.get(DAY_OF_MONTH)); 
      month = new IntWritable(cal.get(MONTH)); 
      year = new IntWritable(cal.get(YEAR)); 

      String p = decode(m.group("path"), UTF_8.name()); 

      root = new Text(p.substring(1, p.indexOf(FILE_SEPARATOR, 1))); 
      filename = new Text(
      p.substring(p.lastIndexOf(FILE_SEPARATOR) + 1)); 
      path = new Text(p); 

      status = new IntWritable(Integer.parseInt(m.group("status"))); 
      size = new LongWritable(Long.parseLong(m.group("size"))); 
     } catch (ParseException | UnsupportedEncodingException e) { 
      isMatchFound = false; 
     } 
    } 

    public boolean isSkipped() { 
     return jvm == null; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     jvm.readFields(in); 
     day.readFields(in); 
     // more code 
    } 
    @Override 
    public void write(DataOutput out) throws IOException { 
     jvm.write(out); 
     day.write(out); 
     // more code 
    } 
    @Override 
    public int compareTo(SkippableLogRecord other) {...} 
    @Override 
    public boolean equals(Object obj) {...} 
} 

Картопостроитель:

public class LogMapper extends 
    Mapper<LongWritable, Text, SkippableLogRecord, NullWritable> {  
    @Override 
    protected void map(LongWritable key, Text line, Context context) { 
     SkippableLogRecord rec = new SkippableLogRecord(line); 

     if (!rec.isSkipped()) { 
      try { 
       context.write(rec, NullWritable.get()); 
      } catch (IOException | InterruptedException e) {...} 
     } 
    } 
} 

Разбавление:

public class LogReducer extends 
    Reducer<SkippableLogRecord, NullWritable, DBRecord, NullWritable> {  
    @Override 
    protected void reduce(SkippableLogRecord rec, 
     Iterable<NullWritable> values, Context context) { 
     try { 
      context.write(new DBRecord(rec), NullWritable.get()); 
     } catch (IOException | InterruptedException e) {...} 
    } 
} 

DBRecord:

public class DBRecord implements Writable, DBWritable { 
    // fields 

    public DBRecord(SkippableLogRecord logRecord) { 
     jvm = logRecord.getJvm().toString(); 
     day = logRecord.getDay().get(); 
     // more code for rest of the fields 
    } 

    @Override 
    public void readFields(ResultSet rs) throws SQLException { 
     jvm = rs.getString("jvm"); 
     day = rs.getInt("day"); 
     // more code for rest of the fields 
    } 

    @Override 
    public void write(PreparedStatement ps) throws SQLException { 
     ps.setString(1, jvm); 
     ps.setInt(2, day); 
     // more code for rest of the fields 
    } 
} 

Driver:

public class Driver extends Configured implements Tool { 
    @Override 
    public int run(String[] args) throws Exception { 
     Configuration conf = getConf(); 

     DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver 
     "jdbc:mysql://localhost:3306/aac", // db url 
     "***", // user name 
     "***"); // password 

     Job job = Job.getInstance(conf, "log-miner"); 

     job.setJarByClass(getClass()); 
     job.setMapperClass(LogMapper.class); 
     job.setReducerClass(LogReducer.class); 
     job.setMapOutputKeyClass(SkippableLogRecord.class); 
     job.setMapOutputValueClass(NullWritable.class); 
     job.setOutputKeyClass(DBRecord.class); 
     job.setOutputValueClass(NullWritable.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(DBOutputFormat.class); 

     FileInputFormat.setInputPaths(job, new Path(args[0])); 

     DBOutputFormat.setOutput(job, "log", // table name 
     new String[] { "jvm", "day", "month", "year", "root", 
      "filename", "path", "status", "size" } // table columns 
     ); 

     return job.waitForCompletion(true) ? 0 : 1; 
    } 
    public static void main(String[] args) throws Exception { 
     GenericOptionsParser parser = new GenericOptionsParser(
     new Configuration(), args); 

     ToolRunner.run(new Driver(), parser.getRemainingArgs()); 
    } 
} 

Работа журнала выполнения:

15/02/28 02:17:58 INFO mapreduce.Job: map 100% reduce 100% 
15/02/28 02:17:58 INFO mapreduce.Job: Job job_local166084441_0001 completed successfully 
15/02/28 02:17:58 INFO mapreduce.Job: Counters: 35 
    File System Counters 
     FILE: Number of bytes read=37074 
     FILE: Number of bytes written=805438 
     FILE: Number of read operations=0 
     FILE: Number of large read operations=0 
     FILE: Number of write operations=0 
     HDFS: Number of bytes read=476788498 
     HDFS: Number of bytes written=0 
     HDFS: Number of read operations=11 
     HDFS: Number of large read operations=0 
     HDFS: Number of write operations=0 
    Map-Reduce Framework 
     Map input records=482230 
     Map output records=0 
     Map output bytes=0 
     Map output materialized bytes=12 
     Input split bytes=210 
     Combine input records=0 
     Combine output records=0 
     Reduce input groups=0 
     Reduce shuffle bytes=12 
     Reduce input records=0 
     Reduce output records=0 
     Spilled Records=0 
     Shuffled Maps =2 
     Failed Shuffles=0 
     Merged Map outputs=2 
     GC time elapsed (ms)=150 
     Total committed heap usage (bytes)=1381498880 
    Shuffle Errors 
     BAD_ID=0 
     CONNECTION=0 
     IO_ERROR=0 
     WRONG_LENGTH=0 
     WRONG_MAP=0 
     WRONG_REDUCE=0 
    File Input Format Counters 
     Bytes Read=171283337 
    File Output Format Counters 
     Bytes Written=0 
+0

Вы пробовали без Hadoop? Используйте его только в качестве последнего средства, если у вас есть рабочий процесс, который не масштабируется. Избавьтесь от всех «новых» вызовов в вашем внутреннем цикле - также нет нового «Матчи». Это очень дорого. И не игнорируйте исключения ... скорее всего, вы просто не разбираете каждую строку ... –

+0

@ Anony-Mousse Как я уже сказал, разбор работает, потому что у меня есть единичный тест для него. Исключения на самом деле не игнорируются, я не показал их для краткости. И, наконец, я хочу, чтобы программа работала сначала, а затем беспокоиться о масштабировании. Программа, которая масштабируется красиво, но ничего не делает, не стоит ни копейки. –

+1

Единичный тест в пределах mapreduce или с другими типами данных за пределами? По-видимому, ваша карта дает 0 записей! Так что это должно быть пропустить все. Кроме того, дизайн памяти сразу же, вместо того, чтобы переписывать его позже ... следовать лучшим практикам. Например, 'Text' существует, потому что' String' стоит слишком дорого, а 'IntWritable' - это _reusable_ Integer' –

ответ

0

Чтобы ответить на мой собственный вопрос, вопрос был ведущим пробельные вызвавшие сличитель на провал. Единичный тест не тестировался с ведущими пробелами, но по каким-то причинам у реальных журналов были эти ошибки. Еще одна проблема с вышеприведенным кодом заключалась в том, что все поля в классе были инициализированы в методе readLine. Как уже упоминал Anony-Mousse, это дорого, потому что типы данных Hadoop предназначены для повторного использования. Это также вызвало большую проблему с сериализацией и десериализацией. Когда Hadoop попытался восстановить класс, вызвав readFields, он вызвал NPE, потому что все поля были пустыми. Я также сделал другие незначительные улучшения, используя некоторые классы и синтаксис Java 8. В конце концов, хотя я получил его работу, я переписал код, используя Spring Boot, Spring Data JPA и поддержку Spring для асинхронной обработки, используя @Async.

 Смежные вопросы

  • Нет связанных вопросов^_^