2013-03-12 4 views
1

Я довольно новичок в hadoop, и я запускаю несколько заданий mapReduce на кластере из 5 узлов. Я начал получать исключения «Filesystem closed» при запуске более одного потока. Работы работают нормально, когда запускаются по одному. Ошибки появляются после сопоставления непосредственно перед сокращением. Это выглядит следующим образом:Исключение HFS-файловой системы при использовании threadpool

java.lang.Exception: java.io.IOException: Filesystem closed 
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:399) 
Caused by: java.io.IOException: Filesystem closed 
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:552) 
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:648) 
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706) 
at java.io.DataInputStream.read(Unknown Source) 
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209) 
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173) 
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:167) 
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:526) 
at org.apache.hadoop.mapreduce.task.MapContextImpl.nextKeyValue(MapContextImpl.java:80) 
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.nextKeyValue(WrappedMapper.java:91) 
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143) 
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:338) 
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:231) 
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) 
at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
at java.lang.Thread.run(Unknown Source) 

Это не происходит все время, и он будет работать хорошо, если я повторно выполнить неудавшуюся работу. К сожалению, это занимает слишком много времени. Я предполагаю, что это связано с несколькими задачами, обращающимися к одному и тому же входному файлу, и когда одна задача завершается, он закрывает входной файл для всех задач. Если это проблема, я хотел бы знать, как это переопределить. Я попробовал переопределить очистку внутри картографа, чтобы повторно открыть путь, но это кажется глупым и не работает.

@Override 
public void cleanup(Context context){ 
     Job tempJob; 
     try { 
      tempJob = new Job(); 
      Path fs = ((FileSplit) context.getInputSplit()).getPath(); 
      FileInputFormat.addInputPath(tempJob, fs); 
      System.out.println("Finished map task for " + context.getJobName()); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

Я также интересно, если это фундаментальная проблема с использованием ThreadPool для выполнения заданий Hadoop MapReduce. Спасибо за любые идеи.

EDIT: Возможно, я был немного неясен, когда я имел в виду задания и задания. Я фактически выполняю несколько заданий со своими собственными мапперами и редукторами. Каждое из этих заданий будет генерировать столбец для конкретной таблицы, которую я создаю. Произнесите сумму или счет. Каждая работа имеет свой собственный поток, и все они получают доступ к одному и тому же входному файлу. Проблема, с которой я сталкиваюсь, заключается в том, что когда некоторые из заданий заканчиваются, они будут бросать «Исключенное исключение файловой системы». Я также использую пряжу, если это может изменить ситуацию.

ответ

1

Как правило, если у вас нет очень интенсивной работы с процессором, я бы не рекомендовал использовать несколько потоков в одной и той же задаче, это увеличивает вероятность возникновения проблем в вашей JVM, и затраты на повторную задачу больше. Вероятно, вам стоит подумать о том, чтобы увеличить количество задач по карте, но каждая задача будет работать в отдельной JVM, но она намного более чистая.

Если вы действительно хотите пойти многопоточный путь, то я подозреваю, что вы используете неправильный тип преобразователя, для многопоточных приложений в следуешь использовать MultithreadedMapper, который имеет другую реализацию run метод и должен быть потокобезопасным. Вы можете использовать его как это:

job.setMapperClass(MultithreadedMapper.class); 

Вы можете указать количество потоков, как это:

int numThreads = 42; 
MultithreadedMapper.setNumberOfThreads(numThreads); 
+0

I может перепутать разница между Иовом и задачи. Я фактически выполняю несколько заданий со своими собственными мапперами и редукторами. Каждое из этих заданий будет генерировать столбец для конкретной таблицы, которую я создаю. Произнесите сумму или счет. Каждая работа имеет свой собственный поток, и все они получают доступ к одному и тому же входному файлу. Проблема, с которой я сталкиваюсь, заключается в том, что когда некоторые из заданий заканчиваются, они будут бросать «Исключенное исключение файловой системы». Я не верю в эту ситуацию. Я запускаю несколько потоков в одной и той же задаче. Я попытаюсь уточнить свой предыдущий пост. – tchap

+0

Да, это исключение случается и для меня. в то время как 2 задания доступа к одному файлу. Рассмотрите это - 2 разных задания открыли один и тот же файл в hdf. один из них завершит свой процесс и закроет этот файл. если другой пытается получить доступ к файлу, возникает закрытое исключение «Файловая система». –