Я работаю над проектом, в котором у меня будут разные Bundles/Models. Давайте возьмем пример. Предположим, у меня есть 4 пакета, и каждый из этих пакетов будет иметь имя метода process
.Проблема безопасности нити при выделении нити для каждого пакета
Ниже перечислены вещи, я должен DO-
- мне нужно называть все эти 4 Связки
process method
параллельно с использованием многопоточности иprocess method
в каждом пучке будет возвращать мне карту, а затем записать эту карту в база данных в том же потоке или что-то другое - лучший подход (я не уверен, что это правильный путь). - А также я хочу, чтобы на уровне потока была включена функция таймаута. Значение, если какой-либо Bundle занимает много времени для выполнения, то этот поток Bundle должен получить тайм-аут и регистрироваться как ошибка, заявляя, что этот конкретный пакет получил тайм-аут bcoz, который занимал много времени.
Следующая попытка, которую я совершил, скорее всего, испорчена, и обработка ошибок отнюдь не завершена. Может ли кто-нибудь вести меня, что я должен делать в случаях обработки ошибок?
Ниже приведен мой метод, который будет называть process method
из всех комплектов многопоточным способом.
public void processEvents(final Map<String, Object> eventData) {
ExecutorService pool = Executors.newFixedThreadPool(5);
List<ProcessBundleHolderEntry> entries = new ArrayList<ProcessBundleHolderEntry>();
Map<String, String> outputs = (Map<String, String>)eventData.get(BConstants.EVENT_HOLDER);
for (BundleRegistration.BundlesHolderEntry entry : BundleRegistration.getInstance()) {
ProcessBundleHolderEntry processBundleHolderEntry = new ProcessBundleHolderEntry(entry, outputs);
entries.add(processBundleHolderEntry);
}
try {
List<Future<Object>> futures = pool.invokeAll(entries, 30, TimeUnit.SECONDS);
for (int i = 0; i < futures.size(); i++) {
// This works since the list of future objects are in the
// same sequential order as the list of entries
Future<Object> future = futures.get(i);
ProcessBundleHolderEntry entry = entries.get(i);
if (!future.isDone()) {
// log error for this entry
}
}
} catch (InterruptedException e) {
// handle this exception!
}
}
Во-вторых, реализация Callable для потоков:
public class ProcessBundleHolderEntry implements Callable {
private BundleRegistration.BundlesHolderEntry entry;
private Map<String, String> outputs;
public ProcessBundleHolderEntry(BundleRegistration.BundlesHolderEntry entry, Map<String, String> outputs) {
this.entry = entry;
this.outputs = outputs;
}
public Object call() throws Exception {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to the database.
System.out.println(response);
return response;
}
}
Может кто-нибудь сказать мне, есть ли какие-либо проблемы с вышеизложенным подходом или есть лучше и эффективный способ сделать то же самое ? Я не уверен, есть ли проблема безопасности нитей.
Любая помощь будет оценена по этому вопросу.
Спасибо assylias за предложение. Кроме того, есть еще одна проблема с моим кодом, о котором я должен беспокоиться? Одна из быстрых вещей, которые я хотел сообщить вам, - метод processEvents' будет вызываться каждый раз, и он не будет ждать завершения всех потоков в методе processEvents. Значение метода processEvents будет вызываться асинхронно каждый раз. Поэтому не уверен, приведет ли это к какой-либо проблеме или нет. Любые мысли? – ferhan
Любые идеи о моем вышеупомянутом вопросе ..? Благодарю. – ferhan
Для подробного обзора кода я предлагаю http: //codereview.stackexchange.com/- С точки зрения безопасности потока все зависит от того, изменена ли eventData во время операции или нет. Если это не так, тогда вы в порядке. Что касается вашего кода, 'future.isDone()' просто означает, что задача еще не завершена - это не значит, что есть ошибка. Поэтому я предлагал вместо этого использовать 'future.get()', но это означает, что метод не будет завершен, пока все фьючерсы не будут завершены, что, возможно, не то, что вы хотите ... – assylias