2013-08-31 2 views
0

Я работаю над проектом, в котором у меня будут разные Связки. Возьмем пример. Предположим, у меня есть 5 пачек, и каждый из этих пучков будет иметь имя метода process.Задержка потока, если какой-либо комплект занимает много времени

В настоящее время я звоню process method из всех этих 5 пакетов последовательно, один за другим, а затем я пишу в базу данных. Но этого я не хочу.

  1. Мне нужно вызвать все эти 5 пачек process method параллельно, используя многопоточность, а затем записать в базу данных.
  2. И я также хочу иметь некоторую функцию тайм-аута для этих потоков. У меня будут настройки тайм-аута по умолчанию для всех потоков для пакетов. Если какой-либо пакет занимает несколько больше времени, чем настройки тайм-аута, которые у меня есть, то я хочу, чтобы тайм-аут этих потоков, а затем вернулся, заявив, что этот пакет получил тайм-аут bcoz, на это уходило много времени.

Я надеюсь, что вопрос достаточно ясен ...

Ниже приведен код, я до сих пор, который вызова метода процесс последовательно один за другим.

public void processEvents(final Map<String, Object> eventData) { 

    final Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER); 

    for (final BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) { 

     final Map<String, String> response = entry.getPlugin().process(outputs); 

     // write to the database. 
     System.out.println(response); 
    } 
} 

Я не уверен, что является лучшим и эффективным способом для этого? Потому что в будущем возможно, что у меня будет более 5 пакетов.

Может ли кто-нибудь предоставить мне пример того, как я могу это достичь? Любая помощь будет оценена по этому поводу. Благодарю.

ответ

1

Не так сложно добиться того, чего вы хотите, но вы должны знать, что с параллелизмом и тайм-аутами вы получаете дополнительную сложность, особенно когда дело касается обработки ошибок.

Например, потоки, выполняемые при возникновении таймаута, могут продолжать работать после таймаута. Только хорошо выполненные потоки, которые взаимодействуют при обработке сигнала прерывания, смогут успешно завершить обработку в середине обработки.

Вы также должны убедиться, что отдельные записи связки могут обрабатываться параллельно, то есть, что они являются потокобезопасными. Если они изменяют некоторый общий ресурс во время обработки, то в результате вы можете получить странные ошибки.

Мне также было интересно, хотите ли вы включить запись базы данных в каждый из этих потоков. Если это так, вам нужно будет обрабатывать прерывания при записи в базу данных; например путем отката транзакции.

В любом случае, чтобы получить пул потоков и общий тайм-аут для всех потоков, вы можете использовать ExecutorService с (например) фиксированным размером пула и вызывать все потоки, используя метод invokeAll.

Возможно, следующая попытка ошибочна, и обработка ошибок отнюдь не завершена, но она должна дать вам отправную точку.

Во-первых, реализация Callable для потоков:

public class ProcessBundleHolderEntry implements Callable { 
    private BundleRegistration.BundlesHolderEntry entry; 
    private Map<String, String> outputs; 

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) { 
     this.entry = entry; 
     this.outputs = outputs; 
    } 

    public Object call() throws Exception { 
     final Map<String, String> response = entry.getPlugin().process(outputs); 
     // write to the database. 
     System.out.println(response); 
     return response; 
    } 
} 

и теперь, модифицированный processEvents метод:

public void processEvents(final Map<String, Object> eventData) { 
    ExecutorService pool = Executors.newFixedThreadPool(5); 
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>(); 

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER); 
    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) { 
     ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs); 
     entries.add(processBundleHolderEntry); 
    } 

    try { 
     List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS); 
     for (int i = 0; i < futures.size(); i++) { 
      // This works since the list of future objects are in the 
      // same sequential order as the list of entries 
      Future<Object> future = futures.get(i); 
      ProcessBundleHolderEntry entry = entries.get(i); 
      if (!future.isDone()) { 
       // log error for this entry 
      } 
     } 
    } catch (InterruptedException e) { 
     // handle this exception! 
    } 
} 
+0

Один быстрый вопрос по этому вопросу. Метод 'processEvents' будет получать каждый раз для любых новых событий. Итак, внутри этого метода я буду создавать потоки каждый раз, используя ExecutorService так же, как вы это делали? – ferhan

+0

И в этом случае тайм-аут специфичен для каждого потока, то есть, если у меня есть 2 пакета, то тот поток, который выполняет пакет-A, получит тайм-аут, если он не сможет обработать этот период времени правильно? – ferhan

+0

Да, каждый раз, когда processEvents называется, он будет порождать новые потоки (макс. 5 в текущем имплементе) и ждать их завершения или таймаута. Если 'processEvents' вызывается из другого потока до того, как предыдущий был закончен, они также будут работать одновременно. – Steinar

0

Ответ дается Steinar является правильным, но это решение не является масштабируемым, как вы сказали: «В будущем возможно, что у меня будет более 5 пакетов». и я уверен, что вы можете добавлять пакеты во время выполнения или позже, если выполняются некоторые задачи, а также может быть ограничение, которое можно выполнить как можно ближе к «n» пакетам, в этом случае executorService.InvokeAll прекратит ожидающий задачи, которые не запускаются, если указан таймер.
Я создал простой образец, который может быть вам полезен. В этом примере приведена гибкость в отношении того, сколько потоков вы хотите запускать параллельно, и вы можете добавлять задачи или пакеты по мере необходимости.

import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import testproject.Bundles; 
import testproject.ExecuteTimedOperation; 

public class ParallelExecutor 
{ 
    public static int NUMBER_OF_PARALLEL_POLL = 4; 

    public static void main(String[] args) 
    {  
     ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_PARALLEL_POLL);  
     // Create bundle of objects you want 
     List<Bundles> lstBun = new ArrayList<Bundles>(); 
     for (Bundles bundles : lstBun) 
     { 
      final ExecuteTimedOperation ope =new ExecuteTimedOperation(bundles, new HashMap<String, Object>()); 
      executorService.submit(new Runnable() 
      { 
       public void run() 
       { 
        ope.ExecuteTask(); 
       } 
      }); 
     } 
    } 
} 
package testproject; 
import java.util.Map; 
import java.util.Random; 

public class ExecuteTimedOperation 
{ 
    Bundles _bun; 
    Map<String, Object> _eventData; 
    public static long TimeInMilleToWait = 60 * 1000; //Time which each thread should wait to complete task  

    public ExecuteTimedOperation(Bundles bun, Map<String, Object> eventData) 
    { 
     _bun = bun; 
     _eventData = eventData; 
    } 


    public void ExecuteTask() 
    { 
     try 
     {  
      Thread t = new Thread(new Runnable() 
      { 
       public void run() 
       { 
        _bun.processEvents(_eventData);     
       } 
      }); 

      t.start(); 
      t.join(TimeInMilleToWait); 
     } 
     catch (InterruptedException e) { 
       //log back saying this bundle got timeout bcoz it was taking lot of time. 
     } 
     catch (Exception e) { 
      //All other type of exception will be handled here  
     } 
    } 
} 

package testproject; 

import java.util.Map; 

public class Bundles 
{ 

    public void processEvents(final Map<String, Object> eventData) 
    { 
     //THE code you want to execute 

    } 
} 
+0

Вы, кажется, неправильно поняли, как работает «ExecutorService». У меня есть два жестко заданных значения в предлагаемом решении; 5 потоков в threadpool и тайм-аут 30 секунд. В реальной реализации оба этих значения должны быть настраиваемыми. Кроме того, даже если существует только 5 потоков, это не означает, что можно обрабатывать только 5 пакетов, это означает, что одновременно обрабатываются только 5 пакетов. Если у вас было, например, 10 пачек. Если 1 занял 35 секунд, но остальные 9 заняли 1 секунду, то вы все равно закончили бы 9 завершенных и 1 отмененную обработку. – Steinar

+0

Здесь вы смешиваете две разные парадигмы; используя «ExecutorService» для запуска потоков, которые ничего не делают, кроме запуска новых потоков, используя старый стиль 'новый поток' и' join'. Зачем ты это делаешь? Я не понимаю, что вы набираете, делая это. – Steinar

+0

Steinar, если вы посмотрите на код и вопрос еще раз, то комментарий от вас «если у вас было, например, 10 пачек. Если 1 занял 35 секунд, но остальные 9 заняли 1 секунду, то вы по-прежнему заканчиваются 9 завершенной и 1 отмененной обработкой ». это то, что нужно TechGreeky в качестве второго его вопроса: «Если какой-либо пакет занимает немного больше времени, чем у параметров тайм-аута, которые у меня есть, то я хочу отключить эти потоки, а затем вернуться назад, сказав, что этот пакет получил тайм-аут bcoz, который он принимал времени.". Поэтому этот комментарий здесь не имеет значения. – dbw