3

Я довольно новичок в параллелизме и параллелизме, и я пытаюсь реализовать алгоритм срединного фильтра, используя Fork-Join в Java. В основном я читаю входной файл в ArrayList и использую этот список для создания нового ArrayList фильтрованных медианов (включая первый и последний элемент исходного ArrayList).Java Fork-Join не работает с большим ArrayList

Теперь мне удалось создать последовательную/последовательную версию алгоритма, и он отлично работает. Однако, когда я пытался сделать версию Fork-Join, она не работает для больших ArrayLists (100000+). Я попробовал его с очень маленьким ArrayList размером 5, и он отлично работает. Кажется, я не могу найти свою ошибку (которая, я уверен, является логической ошибкой и/или ошибкой реализации). Любая помощь будет оценена по достоинству.

Вот последовательный алгоритм сниппет:

//Add first boundary element to output ArrayList 
    outputElements.add(this.elements.get(0)); 

    //Start Filter Algorithm 
    while(elements.size()-counter >= filterSize){ 
     for(int i = 0; i<filterSize; i++){ 
      tempElements.add(this.elements.get(i+counter)); 
      if(i==filterSize){ 
       break; 
      } 
     } 

     Collections.sort(tempElements); 
     outputElements.add(tempElements.get((filterSize-1)/2)); 

     counter++; 
     tempElements.clear(); 
    } 

    //Add last boundary element to output ArrayList. 
    if (elements != null && !elements.isEmpty()) { 
     outputElements.add(elements.get(elements.size()-1)); 
    }//End Filter Algorithm 

Вот параллельный класс я сделал. Это та часть, которая не работает:

public class Parallel extends RecursiveTask<List<Float>>{ 
    int lo; 
    int hi; 
    int filterSize; 
    String outFile; //Output file name. 
    int arraySize; 
    List<Float> elements = new ArrayList<Float>(); 
    List<Float> tempElements = new ArrayList<Float>(); 
    List<Float> outputElements = new ArrayList<Float>(); 
    int counter = 0; 
    static final int SEQUENTIAL_CUTOFF=1000; 

    public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) { 
     this.lo = lo; 
     this.hi = hi; 
     this.elements = elements; 
     this.outFile = outFile; 
     this.filterSize = filterSize;  
     if(lo == 0){ 
      outputElements.add(this.elements.get(0)); 
     } 
    } 
    @Override 
    protected List<Float> compute() { 
     long startTime = System.nanoTime(); //Algorithm starts here 
     if((hi-lo) < SEQUENTIAL_CUTOFF) { 
      while(hi-counter >= filterSize){ 
       for(int i = lo; i<filterSize; i++){ 
        tempElements.add(this.elements.get(i+counter)); 
        if(i==filterSize){ 
         break; 
        } 
       }    
       Collections.sort(tempElements); 
       outputElements.add(tempElements.get((filterSize-1)/2)); 
       counter++; 
       tempElements.clear(); 
       return outputElements; 
      } 
      }else{    
       Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2)); 
       Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi); 
       left.fork(); 

       List<Float> leftArr = new ArrayList<Float>(); 
       List<Float> rightArr = new ArrayList<Float>(); 

      rightArr = right.compute(); 
      leftArr = left.join(); 

      List<Float> newList = new ArrayList<Float>(); 
      newList.addAll(leftArr); 
      newList.addAll(rightArr);  

      } 
     long endTime = System.nanoTime();//Algorithm ends here. 

     //Write elements to output file 
     PrintWriter writeOutput = null; 
     try { 
      writeOutput = new PrintWriter(this.outFile, "UTF-8"); 
     } catch (FileNotFoundException | UnsupportedEncodingException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     writeOutput.println(outputElements.size());//Number of lines 
     for(int i=0; i<outputElements.size();i++){ 
      writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written 
     } 

     writeOutput.close(); //Close when output finished writing. 
     System.out.println("Parallel complete"); 
     return null; 
    } 
} 

Любая помощь очень ценится. Я не могу получить это правильно, потратив несколько часов и много разбираясь вокруг S.O и Google.

Редактировать: musical_coder предложил опубликовать ошибки, с которыми я столкнулся, и вот они. Это очень много ошибок:

Exception in thread "main" java.lang.IndexOutOfBoundsException 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536) 
    at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596) 
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640) 
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521) 
    at main.main(main.java:45) 
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0 
    at java.util.ArrayList.rangeCheck(ArrayList.java:635) 
    at java.util.ArrayList.get(ArrayList.java:411) 
    at Parallel.compute(Parallel.java:44) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:57) 
    at Parallel.compute(Parallel.java:1) 
    at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93) 
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334) 
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604) 
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784) 
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646) 
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398) 
+0

В общем, пожалуйста, избегайте фразы «это не работает», так как это действительно не дает нам ничего, чтобы помочь вам. Вместо этого, описывайте конкретные проблемы, с которыми вы сталкиваетесь, - это сбой/попадание в бесконечный цикл/что-то еще? –

+0

@musical_coder Спасибо. Я опубликовал ошибки в редактировании. Я не уверен, насколько разумно публиковать все подобные ошибки. Не стесняйтесь редактировать его :) – nTuply

+0

Отлично, что трассировка стека полезна. Это похоже на множество ошибок, но на самом деле все это вызвано единственным исключением, которое разбило вашу программу, в частности, на Parallel.compute (Parallel.java:44) '. Причина: 'java.lang.IndexOutOfBoundsException: Index: 1, Size: 0' (т. Е. Вы пытаетесь получить доступ к элементу массива, который не существует). К сожалению, на данный момент у вас нет времени для более глубокого анализа, но посмотрите на строку 44 и посмотрите, где вы поступили неправильно. Вы также можете попробовать установить контрольную точку там, чтобы увидеть, можете ли вы обнаружить что-нибудь странное во время работы программы. –

ответ

0

В общем, следует избегать использования ArrayList с в многопоточном коде, так как он не поточно-:

Обратите внимание, что эта реализация не синхронизирована , Если несколько потоков обращаются к экземпляру ArrayList одновременно, и, по крайней мере, один из потоков изменяет список структурно, он должен быть синхронизированным извне.

Я не вижу ничего в фрагментах вы вывешенные, который модифицирует списки одновременно, но я сделать видеть вас выдав this.elements в детских Parallel случаях, то есть, как минимум, вы делаете что-то рискованное (указатели обмена к изменяемым, не потокобезопасным объектам между потоками).

В качестве первого прохода, замените this.elements = elements; в вашем Parallel конструктор со следующим:

this.elements = Collections.unmodifiableList(elements); 

Делая список unmodifiable, вы гарантируете, что если ваш Parallel код пытается мутировать список, вы будете получить ясную ошибку прямо в точке отказа. Это не мешает чему-то другому, вне Parallel от изменения оригинального списка elements, но это быстрый и простой способ проверки. Parallel ведет себя правильно. Если вы получите UnsupportedOperationException, вам необходимо будет изменить класс Parallel - вы не можете одновременно изменять ArrayList.

Если вы не получите UnsupportedOperationException, что-то еще изменяет ваш список. Вам нужно будет найти это и удалить его.


После того, как вы выяснили, что вызывает ваш список мутировать одновременно, вы можете попробовать, чтобы определить наилучший путь вперед. Пройдя через все «правильные» способы обмена данными между нитями за то, что я могу надеяться, чтобы покрыть в этом ответе, но вот несколько общих правил:

  • Избегайте изменяемые структуры данных - создать свой Parallel класс для обработки данных только из неизменяемых структур данных, например Guava's ImmutableList. По умолчанию неизменяемые структуры данных потокобезопасны.
  • Использование потокобезопасных структур данных - например, ConcurrentLinkedQueue - это поточный режим для нескольких процессов для чтения и записи в одну и ту же структуру данных. ConcurrentHashMap - еще один широко используемый класс. То, что вам нужно очень многое зависит от того, что вы пытаетесь сделать, но это хорошие отправные точки.
  • Сведите к минимуму объем ваших параллельных операций - даже с параллельными структурами данных ваша общая цель должна быть для каждой задачи выполняться изолированно, за исключением чтения из общего источника и записи на общий приемник. Делайте как можно больше работы над объектами, которые видны только одному потоку.
  • Синхронизировать - Я замечаю, что Parallel пишет в outFile без какой-либо явной синхронизации. Это опасно и может привести к возникновению проблем (либо сбоев, либо ухудшения данных). Только один поток должен когда-либо писать в файл за раз. Сделайте это либо с помощью выделенного потока для записи файлов, либо путем явной синхронизации операций записи файлов.