Я пишу MR-задание для ведения журналов веб-сервера. Вход для задания - из текстовых файлов, вывод - в базу данных MySQL. Проблема в том, что работа завершена успешно, но ничего не записывает в БД. Я некоторое время не программировал MR, поэтому, скорее всего, это ошибка, которую я не могу найти. Это не соответствие шаблонов (см. Ниже), что я тестировал блок и отлично работает. Что мне не хватает? Mac OS X, Oracle JDK 1.8.0_31, hadoop-2.6.0
Примечание: Исключения регистрируются, я опускал их для краткости.Hadoop MapReduce завершает работу успешно, но ничего не пишет в DB
SkippableLogRecord:
public class SkippableLogRecord implements WritableComparable<SkippableLogRecord> {
// fields
public SkippableLogRecord(Text line) {
readLine(line.toString());
}
private void readLine(String line) {
Matcher m = PATTERN.matcher(line);
boolean isMatchFound = m.matches() && m.groupCount() >= 5;
if (isMatchFound) {
try {
jvm = new Text(m.group("jvm"));
Calendar cal = getInstance();
cal.setTime(new SimpleDateFormat(DATE_FORMAT).parse(m
.group("date")));
day = new IntWritable(cal.get(DAY_OF_MONTH));
month = new IntWritable(cal.get(MONTH));
year = new IntWritable(cal.get(YEAR));
String p = decode(m.group("path"), UTF_8.name());
root = new Text(p.substring(1, p.indexOf(FILE_SEPARATOR, 1)));
filename = new Text(
p.substring(p.lastIndexOf(FILE_SEPARATOR) + 1));
path = new Text(p);
status = new IntWritable(Integer.parseInt(m.group("status")));
size = new LongWritable(Long.parseLong(m.group("size")));
} catch (ParseException | UnsupportedEncodingException e) {
isMatchFound = false;
}
}
public boolean isSkipped() {
return jvm == null;
}
@Override
public void readFields(DataInput in) throws IOException {
jvm.readFields(in);
day.readFields(in);
// more code
}
@Override
public void write(DataOutput out) throws IOException {
jvm.write(out);
day.write(out);
// more code
}
@Override
public int compareTo(SkippableLogRecord other) {...}
@Override
public boolean equals(Object obj) {...}
}
Картопостроитель:
public class LogMapper extends
Mapper<LongWritable, Text, SkippableLogRecord, NullWritable> {
@Override
protected void map(LongWritable key, Text line, Context context) {
SkippableLogRecord rec = new SkippableLogRecord(line);
if (!rec.isSkipped()) {
try {
context.write(rec, NullWritable.get());
} catch (IOException | InterruptedException e) {...}
}
}
}
Разбавление:
public class LogReducer extends
Reducer<SkippableLogRecord, NullWritable, DBRecord, NullWritable> {
@Override
protected void reduce(SkippableLogRecord rec,
Iterable<NullWritable> values, Context context) {
try {
context.write(new DBRecord(rec), NullWritable.get());
} catch (IOException | InterruptedException e) {...}
}
}
DBRecord:
public class DBRecord implements Writable, DBWritable {
// fields
public DBRecord(SkippableLogRecord logRecord) {
jvm = logRecord.getJvm().toString();
day = logRecord.getDay().get();
// more code for rest of the fields
}
@Override
public void readFields(ResultSet rs) throws SQLException {
jvm = rs.getString("jvm");
day = rs.getInt("day");
// more code for rest of the fields
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setString(1, jvm);
ps.setInt(2, day);
// more code for rest of the fields
}
}
Driver:
public class Driver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", // driver
"jdbc:mysql://localhost:3306/aac", // db url
"***", // user name
"***"); // password
Job job = Job.getInstance(conf, "log-miner");
job.setJarByClass(getClass());
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(SkippableLogRecord.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(DBRecord.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
DBOutputFormat.setOutput(job, "log", // table name
new String[] { "jvm", "day", "month", "year", "root",
"filename", "path", "status", "size" } // table columns
);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(
new Configuration(), args);
ToolRunner.run(new Driver(), parser.getRemainingArgs());
}
}
Работа журнала выполнения:
15/02/28 02:17:58 INFO mapreduce.Job: map 100% reduce 100%
15/02/28 02:17:58 INFO mapreduce.Job: Job job_local166084441_0001 completed successfully
15/02/28 02:17:58 INFO mapreduce.Job: Counters: 35
File System Counters
FILE: Number of bytes read=37074
FILE: Number of bytes written=805438
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=476788498
HDFS: Number of bytes written=0
HDFS: Number of read operations=11
HDFS: Number of large read operations=0
HDFS: Number of write operations=0
Map-Reduce Framework
Map input records=482230
Map output records=0
Map output bytes=0
Map output materialized bytes=12
Input split bytes=210
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=12
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=150
Total committed heap usage (bytes)=1381498880
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=171283337
File Output Format Counters
Bytes Written=0
Вы пробовали без Hadoop? Используйте его только в качестве последнего средства, если у вас есть рабочий процесс, который не масштабируется. Избавьтесь от всех «новых» вызовов в вашем внутреннем цикле - также нет нового «Матчи». Это очень дорого. И не игнорируйте исключения ... скорее всего, вы просто не разбираете каждую строку ... –
@ Anony-Mousse Как я уже сказал, разбор работает, потому что у меня есть единичный тест для него. Исключения на самом деле не игнорируются, я не показал их для краткости. И, наконец, я хочу, чтобы программа работала сначала, а затем беспокоиться о масштабировании. Программа, которая масштабируется красиво, но ничего не делает, не стоит ни копейки. –
Единичный тест в пределах mapreduce или с другими типами данных за пределами? По-видимому, ваша карта дает 0 записей! Так что это должно быть пропустить все. Кроме того, дизайн памяти сразу же, вместо того, чтобы переписывать его позже ... следовать лучшим практикам. Например, 'Text' существует, потому что' String' стоит слишком дорого, а 'IntWritable' - это _reusable_ Integer' –