2

У меня есть метод с именем process в двух моих классах, скажем CLASS-A and CLASS-B. Теперь в нижнем цикле я вызываю process method обоих моих классов, последовательно означающих один за другим, и он отлично работает, но это не тот способ, который я ищу.Как называть тот же метод другого класса многопоточным способом

for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { 
    final Map<String, String> response = entry.getPlugin().process(outputs); 

    // write to database 
    System.out.println(response); 
} 

Есть ли способ, я могу назвать метод процесса обоих моих классов многопоточным способом. Значение одного потока вызовет метод процесса CLASS-A, а второй поток вызовет метод процесса CLASS-B.

И вот после этого я думал записать данные, возвращаемые методом process в базу данных. Поэтому у меня есть еще один поток для записи в базу данных.

Ниже приведен код, который я придумал многопоточным способом, но каким-то образом он не работает вообще.

public void writeEvents(final Map<String, Object> data) { 

    // Three threads: one thread for the database writer, two threads for the plugin processors 
    final ExecutorService executor = Executors.newFixedThreadPool(3); 

    final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>(); 

    @SuppressWarnings("unchecked") 
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER); 

    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { 
     executor.submit(new Runnable() { 
      public void run() { 
       final Map<String, String> response = entry.getPlugin().process(outputs); 
       // put the response map in the queue for the database to read 
       queue.offer(response); 
      } 
     }); 
    } 

    Future<?> future = executor.submit(new Runnable() { 
     public void run() { 
      Map<String, String> map; 
      try { 
       while(true) { 
        // blocks until a map is available in the queue, or until interrupted 
        map = queue.take(); 
        // write map to database 
        System.out.println(map); 
       } 
      } catch (InterruptedException ex) { 
       // IF we're catching InterruptedException then this means that future.cancel(true) 
       // was called, which means that the plugin processors are finished; 
       // process the rest of the queue and then exit 
       while((map = queue.poll()) != null) { 
        // write map to database 
        System.out.println(map); 
       } 
      } 
     } 
    }); 

    // this interrupts the database thread, which sends it into its catch block 
    // where it processes the rest of the queue and exits 
    future.cancel(true); // interrupt database thread 

    // wait for the threads to finish 
    try { 
     executor.awaitTermination(5, TimeUnit.MINUTES); 
    } catch (InterruptedException e) { 
     //log error here 
    } 
} 

Но если я удалить последнюю строку executor.awaitTermination(5, TimeUnit.MINUTES); затем начать работать нормально, и через какое-то время, я всегда получаю сообщение об ошибке, как this-

JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait. 
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event 
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd 
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait. 

Может кто-нибудь мне помочь в выяснении, что проблема и что я делаю в моем вышеприведенном коде? если я запускаю последовательно, то я не получаю никаких ошибок, и он работает нормально.

А также есть ли лучший способ сделать это по сравнению с тем, как я это делаю? Потому что в будущем у меня может быть несколько процессоров плагинов по сравнению с двумя.

То, что я пытаюсь сделать, - вызвать метод обработки обоих моих классов многопоточным способом, а затем записать в базу данных. Bcoz мой метод процесса вернет обратно карту.

Любая помощь будет оценена по этому вопросу. И, если возможно, я ищу полезный пример. Спасибо за помощь,

+0

Я не специалист по этим вещам, поэтому мне пришлось искать много вещей. Но я был бы обеспокоен тем, что 'future.cancel' может прервать что-то важное (может даже прервать операции« BlockingQueue », которые должны быть атомарными). Вы пробовали какой-то другой метод синхронизации? Например. добавьте volatile boolean в ваш анонимный Runnable, добавьте метод его установки, запустите цикл в Runnable exit, когда булев установлен, а другой класс использует метод set, а не 'future.cancel'. Таким образом, у вас будет больше контроля. Но я просто догадываюсь. – ajb

+0

Выполнение того, что я предложил, означает, что вам, вероятно, придется написать именованный подкласс «Runnable» и объявить переменную этого класса, чтобы вы могли вызвать метод «set my cancel flag». – ajb

ответ

1

Введенный вами фрагмент кода имеет несколько проблем, если вы их исправите, это должно сработать.
1. Вы используете бесконечный цикл для извлечения элемента из очереди блокировки и пытаетесь разбить его с помощью будущего. Это, безусловно, не очень хороший подход. Проблема с этим подходом заключается в том, что поток базы данных никогда не запускается, потому что он может быть отменен будущей задачей, выполняемой в потоке вызывающего абонента, даже до ее запуска. Это подвержено ошибкам.
- Вы должны запустить фиксированное количество циклов while (вы уже знаете, сколько производителей существует или сколько раз вы собираетесь получить ответ).

  1. Кроме того, задачи представляются исполнитель услуг должна быть независимыми задачами ... вот ваша задача базы данных зависят от выполнения других tasks..this также может привести к тупиковой ситуации, если политика выполнения changes..for примера если вы используете однопоточный пул-исполнитель, и если поток базы данных запланирован, он просто блокирует ожидание того, что производители добавят данные в очередь.

    • Хорошим способом является создание задачи, которая извлекает данные и обновляет базу данных в одном потоке.
    • или получить все ответы, а затем выполнять операции базы данных в параллельном

    общественных недействительных writeEvents (данные итоговая карта) {

    final ExecutorService executor = Executors.newFixedThreadPool(3);  
    @SuppressWarnings("unchecked") 
    final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER); 
    
    for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) { 
        executor.submit(new Runnable() { 
         public void run() { 
          try { 
           final Map<String, String> response = entry.getPlugin().process(outputs); 
           //process the response and update database. 
           System.out.println(map); 
          } catch (Throwable e) { 
           //handle execption 
          } finally { 
           //clean up resources 
          }    
         } 
        }); 
    } 
    

    // Это будет ждать запуска потоков для завершения .. это упорядоченное закрытие. executor.shutdown(); }

+0

Спасибо Amol за предложение. Некоторые из ваших объяснений имеют смысл для меня. Есть ли способ, вы можете привести пример для моего кода? Спасибо за помощь.. – AKIWEB

0

ОК, вот несколько кодов для комментариев, которые я предложил выше. Отказ от ответственности: я не уверен, работает ли он или даже компилируется, или же он решает проблему. Но идея состоит в том, чтобы взять под контроль процесс отмены вместо того, чтобы полагаться на future.cancel, который, как я подозреваю, может вызвать проблемы.

class CheckQueue implements Runnable { 
    private volatile boolean cancelled = false; 
    public void cancel() { cancelled = true; } 
    public void run() { 
     Map<String, String> map; 
     try { 
      while(!cancelled) { 
       // blocks until a map is available in the queue, or until interrupted 
       map = queue.take(); 
       if (cancelled) break; 
       // write map to database 
       System.out.println(map); 
     } catch (InterruptedException e) { 
     } 
     while((map = queue.poll()) != null) { 
      // write map to database 
      System.out.println(map); 
     } 
    } 
} 

CheckQueue queueChecker = new CheckQueue(); 
Future<?> future = executor.submit(queueChecker); 

// this interrupts the database thread, which sends it into its catch block 
// where it processes the rest of the queue and exits 
queueChecker.cancel(); 
+0

Я смущаюсь с этим подходом, поскольку я не уверен, как интегрировать это с моим cpde .. Я не могу выполнить эту работу. – AKIWEB

+0

@ TrekkieTechieT-T: Я сделал некоторые ошибки, которые исправил в своем редактировании. (Мое объявление об отмене было испорчено, и вам все равно придется ловить 'InterruptedException', даже если вы больше не полагаетесь на прерывания.) Если вы уже собрали его для компиляции, но имели одинаковые результаты, это значит, что мой догадка была неправильной. Если вы его скомпилируете, но усугубили ситуацию, тогда мне нужно будет знать, что такое новое плохое поведение. – ajb