Я попытался написать очень простую работу только с 1 картографом и без редуктора для записи некоторых данных в hbase. В mapper я попытался просто открыть соединение с hbase, записать несколько строк данных в таблицу и затем закрыть соединение. В драйвере задания я использую JobConf.setNumMapTasks (1); и JobConf.setNumReduceTasks (0); чтобы указать, что должны быть выполнены только 1 преобразователь и редуктор. Я также устанавливаю класс редуктора в IdentityReducer в jobConf. Странное поведение, которое я наблюдаю, заключается в том, что задание успешно записывает данные в таблицу hbase, но после этого я вижу в журналах, что он постоянно пытался открыть соединение с hbase, а затем закрывает соединение, которое продолжается 20-30 минут и после задания объявляется завершенным со 100% успеха. В конце, когда я проверяю файл _success, созданный фиктивными данными, которые я помещаю в OutputCollector.collect (...), я вижу сто строк фиктивных данных, когда их должно быть только 1. Ниже приведен код для драйвера заданияНеожиданное многократное выполнение mapper, предназначенное для запуска один раз
public int run(String[] arg0) throws Exception {
Configuration config = HBaseConfiguration.create(getConf());
ensureRequiredParametersExist(config);
ensureOptionalParametersExist(config);
JobConf jobConf = new JobConf(config, getClass());
jobConf.setJobName(config.get(ETLJobConstants.ETL_JOB_NAME));
//set map specific configuration
jobConf.setNumMapTasks(1);
jobConf.setMaxMapAttempts(1);
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setMapperClass(SingletonMapper.class);
jobConf.setMapOutputKeyClass(LongWritable.class);
jobConf.setMapOutputValueClass(Text.class);
//set reducer specific configuration
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setOutputKeyClass(LongWritable.class);
jobConf.setOutputValueClass(Text.class);
jobConf.setOutputFormat(TextOutputFormat.class);
jobConf.setNumReduceTasks(0);
//set job specific configuration details like input file name etc
FileInputFormat.setInputPaths(jobConf, jobConf.get(ETLJobConstants.ETL_JOB_FILE_INPUT_PATH));
System.out.println("setting output path to : " + jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH));
FileOutputFormat.setOutputPath(jobConf,
new Path(jobConf.get(ETLJobConstants.ETL_JOB_FILE_OUTPUT_PATH)));
JobClient.runJob(jobConf);
return 0;
}
Класс водителя расширяет Конфигурируемый и реализует инструмент (я использовал образец из окончательного руководства). Ниже приведен код в моем классе сопоставления.
Ниже приведен код в методе карты Mapper, где я просто открываю соединение с Hbase, делаю предварительную проверку, чтобы убедиться, что таблица существует, а затем записывать строки и закрывать таблицу.
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Text> arg2, Reporter arg3)
throws IOException {
HTable aTable = null;
HBaseAdmin admin = null;
try {
arg3.setStatus("started");
/*
* set-up hbase config
*/
admin = new HBaseAdmin(conf);
/*
* open connection to table
*/
String tableName = conf.get(ETLJobConstants.ETL_JOB_TABLE_NAME);
HTableDescriptor htd = new HTableDescriptor(toBytes(tableName));
String colFamilyName = conf.get(ETLJobConstants.ETL_JOB_TABLE_COLUMN_FAMILY_NAME);
byte[] tablename = htd.getName();
/* call function to ensure table with 'tablename' exists */
/*
* loop and put the file data into the table
*/
aTable = new HTable(conf, tableName);
DataRow row = /* logic to generate data */
while (row != null) {
byte[] rowKey = toBytes(row.getRowKey());
Put put = new Put(rowKey);
for (DataNode node : row.getRowData()) {
put.add(toBytes(colFamilyName), toBytes(node.getNodeName()),
toBytes(node.getNodeValue()));
}
aTable.put(put);
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo added another data row to hbase");
row = fileParser.getNextRow();
}
aTable.flushCommits();
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxo Finished adding data to hbase");
} finally {
if (aTable != null) {
aTable.close();
}
if (admin != null) {
admin.close();
}
}
arg2.collect(new LongWritable(10), new Text("something"));
arg3.setStatus("xoxoxoxoxoxoxoxoxoxoxoxoadded some dummy data to the collector");
}
Как вы могли видеть вокруг конца, что я пишу некоторые фиктивные данные коллекции в конце концов (10, «что-то»), и я вижу сотни строк этих данных в файле _success после задания прекращается. Я не могу определить, почему код картера перезапускается несколько раз снова и снова, а не запускается только один раз. Любая помощь будет принята с благодарностью.