0

Я работаю над проектом, в котором у меня будут разные Bundles/Models. Давайте возьмем пример. Предположим, у меня есть 4 пакета, и каждый из этих пакетов будет иметь имя метода process.Проблема безопасности нити при выделении нити для каждого пакета

Ниже перечислены вещи, я должен DO-

  1. мне нужно называть все эти 4 Связки process method параллельно с использованием многопоточности и process method в каждом пучке будет возвращать мне карту, а затем записать эту карту в база данных в том же потоке или что-то другое - лучший подход (я не уверен, что это правильный путь).
  2. А также я хочу, чтобы на уровне потока была включена функция таймаута. Значение, если какой-либо Bundle занимает много времени для выполнения, то этот поток Bundle должен получить тайм-аут и регистрироваться как ошибка, заявляя, что этот конкретный пакет получил тайм-аут bcoz, который занимал много времени.

Следующая попытка, которую я совершил, скорее всего, испорчена, и обработка ошибок отнюдь не завершена. Может ли кто-нибудь вести меня, что я должен делать в случаях обработки ошибок?

Ниже приведен мой метод, который будет называть process method из всех комплектов многопоточным способом.

public void processEvents(final Map<String, Object> eventData) { 
    ExecutorService pool = Executors.newFixedThreadPool(5); 
    List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>(); 

    Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER); 

    for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) { 
     ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs); 
     entries.add(processBundleHolderEntry); 
    } 

    try { 
     List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS); 
     for (int i = 0; i < futures.size(); i++) { 
      // This works since the list of future objects are in the 
      // same sequential order as the list of entries 
      Future<Object> future = futures.get(i); 
      ProcessBundleHolderEntry entry = entries.get(i); 
      if (!future.isDone()) { 
       // log error for this entry 
      } 
     } 
    } catch (InterruptedException e) { 
     // handle this exception! 
    } 
} 

Во-вторых, реализация Callable для потоков:

public class ProcessBundleHolderEntry implements Callable { 
    private BundleRegistration.BundlesHolderEntry entry; 
    private Map<String, String> outputs; 

    public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) { 
     this.entry = entry; 
     this.outputs = outputs; 
    } 

    public Object call() throws Exception { 
     final Map<String, String> response = entry.getPlugin().process(outputs); 
     // write to the database. 
     System.out.println(response); 
     return response; 
    } 
} 

Может кто-нибудь сказать мне, есть ли какие-либо проблемы с вышеизложенным подходом или есть лучше и эффективный способ сделать то же самое ? Я не уверен, есть ли проблема безопасности нитей.

Любая помощь будет оценена по этому вопросу.

ответ

1

Единственный общий объект в вашем коде: eventData: до тех пор, пока он не будет изменен во время работы этого метода (или если карта и ее содержимое являются потокобезопасными, а изменения будут опубликованы безопасно), все должно быть хорошо.

Что касается обработки исключений из ваших задач, вы обычно делаете:

try { 
    future.get(); 
} catch (ExecutionException e) { 
    Throwable exceptionInFuture = e.getCause(); 
    //throw, log or whatever is appropriate 
} 

Что касается прерванного исключения: это означает, что поток, в котором вы выполняете метод был прерван. Что вам нужно сделать, зависит от случая использования, но вы должны вообще прекратить то, что вы делаете, так что-то вроде:

} catch (InterruptedException e) { 
    pool.shutdownNow(); //cancels the tasks 
    //restore interrupted flag and exit 
    Thread.currentThread.interrupt(); 
    //or rethrow the exception 
    throw e; 
} 

Примечания: цель пулов потоков будет использоваться повторно - вы должны объявить службу исполнителя как переменную экземпляра (private final), а не создавать его каждый раз, когда вызывается метод processEvents.

+0

Спасибо assylias за предложение. Кроме того, есть еще одна проблема с моим кодом, о котором я должен беспокоиться? Одна из быстрых вещей, которые я хотел сообщить вам, - метод processEvents' будет вызываться каждый раз, и он не будет ждать завершения всех потоков в методе processEvents. Значение метода processEvents будет вызываться асинхронно каждый раз. Поэтому не уверен, приведет ли это к какой-либо проблеме или нет. Любые мысли? – ferhan

+0

Любые идеи о моем вышеупомянутом вопросе ..? Благодарю. – ferhan

+0

Для подробного обзора кода я предлагаю http: //codereview.stackexchange.com/- С точки зрения безопасности потока все зависит от того, изменена ли eventData во время операции или нет. Если это не так, тогда вы в порядке. Что касается вашего кода, 'future.isDone()' просто означает, что задача еще не завершена - это не значит, что есть ошибка. Поэтому я предлагал вместо этого использовать 'future.get()', но это означает, что метод не будет завершен, пока все фьючерсы не будут завершены, что, возможно, не то, что вы хотите ... – assylias