2013-07-03 4 views
1

У меня есть каталог, который содержит 1000 файлов csv, которые мне нужно проанализировать. Я выполнил класс ExecutorService Java для выполнения задания, в котором я назначил каждому потоку файл csv для разбора. У меня 4 ядра в моей машине. Я получаю некоторую эффективность по сравнению с однопоточным приложением. Однако, когда я вижу использование ЦП (используя диспетчер задач), похоже, что он не использует всю мощность процессора,% используемого процессора составляет около 30% -40%. Я просто хотел узнать, правильный ли мой подход.Параллельная обработка файлов в Java с помощью ExecutorService не использует всю мощность ЦП

File dir = new File(file); 
if(dir.isDirectory()){ 
    File[] files = dir.listFiles(); 

for(File f : files){ 
    String file_abs_path = f.getAbsolutePath(); 
    int index = file_abs_path.lastIndexOf("/") + 1; 
    file_name = file_abs_path.substring(index); 
    futuresList.add(eservice.submit(new MyParser(file_abs_path))); 
} 

Object gpDocs; 
for(Future<List<MyObj>> future:futuresList) { 
try { 
    docs = future.get(); 
    arrayList = (List<MyObj>)docs; 
    Iterator<MyObj> it = arrayList.iterator(); 
    while(it.hasNext()){ 
    doc = createDocument(file_name,it.next()); 
    try{ 
     //somefunction(doc); 
     }catch(Exception e){} 
}}catch (InterruptedException e) {} 
catch (ExecutionException e) {} 
}} 

Мне просто интересно, подходит ли мой подход? Любая помощь будет оценена по достоинству.

Благодаря

Код для синтаксического анализатора:

public List<MyObj> call(){ 
    ColumnPositionMappingStrategy<MyObj> strat = 
new ColumnPositionMappingStrategy<MyObj>(); 
strat.setType(MyObj.class); 
String[] columns = new String[] {//list of columns in the csv file}; 

strat.setColumnMapping(columns); 
CsvToBean<MyObj> csv = new CsvToBean<MyObj>(); 
BufferedReader reader = null; 
String doc_line = ""; 
String[] docs; 
String doc = ""; 
File dir = new File(file_path); 
try{ 
    int comma_count = 0; 
    reader = new BufferedReader(new FileReader(dir)); 
    while((doc_line = reader.readLine()) != null){ 
     docs = doc_line.split(","); 
    doc += docs[i] + " "; 
    } 
    reader.close(); 
    }catch (IOException e) {/*e.printStackTrace();*/} 
    return(csv.parse(strat,new StringReader(doc))); 
} 
+2

1. Как создать ExecutorService? 2. Насколько велики эти файлы? Это может быть очень хорошо связанная задача ввода/вывода, а не связанная с ЦП. 3. Не могли бы вы показать нам весь код (а не только эскиз)? –

+2

Ваша задача, вероятно, связана с IO, и слишком много потоков могут фактически уменьшить IO, если он заставляет голову жесткого диска прыгать назад и вперед между файлами. –

+0

Размер ребра около 500 МБ, и каждый файл должен быть около 2-3 МБ. Код, который я использую для создания ExecutorService, - int noProcs = Runtime.getRuntime(). AvailableProcessors(); ExecutorService eservice = Executors.newFixedThreadPool (noProcs); – dehsams123

ответ

1

Как отметил, ваша задача, скорее всего, IO связаны, так как большинство задач, связанных с IO на жестком диске обычно являются.

Наилучшее качество, на которое вы можете надеяться, скорее всего, отделит резьбу чтения от обработки. Вероятно, один поток чтения, максимально считывающий блоки данных и подавая их в очередь для обработки, даст наилучшую общую пропускную способность. Количество потоков обработки будет всего лишь необходимым, чтобы не отставать от чтения.

0

Однако, когда я вижу загрузку процессора (с помощью диспетчера задач) это не кажется, используя всю мощь процессора, то% использования процессора составляет примерно 30% -40%

Это потому, что вы обрабатываете files последовательно, а не параллельно. Смотрите следующий блок кода:

for(Future<List<MyObj>> future:futuresList) { 
try { 
    docs = future.get(); //(1) 
    arrayList = (List<MyObj>)docs;//(2) 

линия (1) является линия, где вы выполняете нить в real..But, что он делает?
Как вы сказали в комментарии
Это простой анализатор файлов CSV с использованием opencsv, который реализует интерфейс Callable и имеет логику синтаксического анализа в методе вызова, который он переопределяет.
И вы делаете потоки выполняться последовательно, а не параллельно. Кроме того, настоящий IO выполняется в следующем коде: Iterator it = arrayList.iterator();

while(it.hasNext()){ 
doc = createDocument(file_name,it.next()); 
try{ 
    //somefunction(doc); 
    }catch(Exception e){} 

createDocument Я думаю, это один с исчерпывающей связанной операцией ввода-вывода. И этот метод выполняется последовательно, а не параллельно каждому потоку.

+1

Я спросил точно так же: http://stackoverflow.com/questions/17456002/parallel-processing-of-files-in-java-with-executorservice-does-not-use-all-of-th/17456049#comment25362724_17456002 YU НЕТ ДАЕТ ВАШ КОД? –

+0

@pavelrappo +1 К вашему комментарию и этому тоже .. :) – Mac

+0

@pavelrappo: Извините, что .. Это просто простой синтаксический код с использованием opencsv. Я редактировал свой пост, чтобы включить код. – dehsams123

0

Как уже было опубликовано, убедитесь, что обработка происходит в ваших потоках, а не в одной диспетчерской нити. Я бы попробовал это относительно простое решение:

  • Дайте каждой теме File объект для работы. Это гарантирует, что фактическая работа выполняется в каждом потоке, а также что у вас ограниченное количество открытых файлов. (Если вы передали InputStream в потоки, например, вы сразу же открыли бы все файлы.Пройдя File с, у вас будет столько открытых файлов, сколько максимальное количество одновременных потоков.)
  • Позвольте использовать BufferedReader или BufferedInputStream с большим буфером, что-то вроде 1-4 МБ. Это заставляет ваше приложение читать большие блоки за раз, что намного эффективнее, чем несколько потоков, читающих маленькие кусочки и постоянный поиск жесткого диска. Надеемся, что ваша ОС будет планировать такие чтения, чтобы они не возникали одновременно.
  • Отправляйте их с помощью ExecutorService, возможно, с числом, немного большим, чем количество ваших процессоров, так что если какой-то поток заблокирован IO, есть достаточно других, которые работают (если для них есть работа, конечно).

Таким образом, результат может выглядеть примерно так:

File[] files = dir.listFiles(); 
final int bufSize = 1024*1024; 

// prepare tasks 
List<Callable<List<MyObj>>> tasks 
    = new ArrayList<Callable<List<MyObj>>>(); 
for(final File file : files) 
    tasks.add(new Callable<List<MyObj>>() { 
     public List<MyObj> call() throws Exception { 
      Reader r = new InputStreamReader(
        new BufferedInputStream(
         new FileInputStream(file), bufSize) 
       ); 
      try { 
       // do processing 
      } finally { 
       r.close(); 
      } 
     } 
    }); 

// run them 
int threadCount = Runtime.getRuntime().availableProcessors() + 2; 
List<Future<List<MyObj>>> results 
    = Executors.newFixedThreadPool(threadCount).invokeAll(tasks); 

Если выясняется, что обработка является узким местом, вместо диска IO, вы могли бы еще больше ускорить ваш парсер немного с помощью java.nio.

(Примечание: я только набросал код, я не пытался скомпилировать его.)