У меня есть метод с именем process
в двух моих классах, скажем CLASS-A and CLASS-B
. Теперь в нижнем цикле я вызываю process method
обоих моих классов, последовательно означающих один за другим, и он отлично работает, но это не тот способ, который я ищу.Как называть тот же метод другого класса многопоточным способом
for (ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
final Map<String, String> response = entry.getPlugin().process(outputs);
// write to database
System.out.println(response);
}
Есть ли способ, я могу назвать метод процесса обоих моих классов многопоточным способом. Значение одного потока вызовет метод процесса CLASS-A, а второй поток вызовет метод процесса CLASS-B.
И вот после этого я думал записать данные, возвращаемые методом process
в базу данных. Поэтому у меня есть еще один поток для записи в базу данных.
Ниже приведен код, который я придумал многопоточным способом, но каким-то образом он не работает вообще.
public void writeEvents(final Map<String, Object> data) {
// Three threads: one thread for the database writer, two threads for the plugin processors
final ExecutorService executor = Executors.newFixedThreadPool(3);
final BlockingQueue<Map<String, String>> queue = new LinkedBlockingQueue<Map<String, String>>();
@SuppressWarnings("unchecked")
final Map<String, String> outputs = (Map<String, String>)data.get(ModelConstants.EVENT_HOLDER);
for (final ModuleRegistration.ModulesHolderEntry entry : ModuleRegistration.getInstance()) {
executor.submit(new Runnable() {
public void run() {
final Map<String, String> response = entry.getPlugin().process(outputs);
// put the response map in the queue for the database to read
queue.offer(response);
}
});
}
Future<?> future = executor.submit(new Runnable() {
public void run() {
Map<String, String> map;
try {
while(true) {
// blocks until a map is available in the queue, or until interrupted
map = queue.take();
// write map to database
System.out.println(map);
}
} catch (InterruptedException ex) {
// IF we're catching InterruptedException then this means that future.cancel(true)
// was called, which means that the plugin processors are finished;
// process the rest of the queue and then exit
while((map = queue.poll()) != null) {
// write map to database
System.out.println(map);
}
}
}
});
// this interrupts the database thread, which sends it into its catch block
// where it processes the rest of the queue and exits
future.cancel(true); // interrupt database thread
// wait for the threads to finish
try {
executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
//log error here
}
}
Но если я удалить последнюю строку executor.awaitTermination(5, TimeUnit.MINUTES);
затем начать работать нормально, и через какое-то время, я всегда получаю сообщение об ошибке, как this-
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
JVMDUMP032I JVM requested Heap dump using 'S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd' in response to an event
JVMDUMP010I Heap dump written to S:\GitViews\Stream\goldseye\heapdump.20130827.142415.16456.0001.phd
JVMDUMP006I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" - please wait.
Может кто-нибудь мне помочь в выяснении, что проблема и что я делаю в моем вышеприведенном коде? если я запускаю последовательно, то я не получаю никаких ошибок, и он работает нормально.
А также есть ли лучший способ сделать это по сравнению с тем, как я это делаю? Потому что в будущем у меня может быть несколько процессоров плагинов по сравнению с двумя.
То, что я пытаюсь сделать, - вызвать метод обработки обоих моих классов многопоточным способом, а затем записать в базу данных. Bcoz мой метод процесса вернет обратно карту.
Любая помощь будет оценена по этому вопросу. И, если возможно, я ищу полезный пример. Спасибо за помощь,
Я не специалист по этим вещам, поэтому мне пришлось искать много вещей. Но я был бы обеспокоен тем, что 'future.cancel' может прервать что-то важное (может даже прервать операции« BlockingQueue », которые должны быть атомарными). Вы пробовали какой-то другой метод синхронизации? Например. добавьте volatile boolean в ваш анонимный Runnable, добавьте метод его установки, запустите цикл в Runnable exit, когда булев установлен, а другой класс использует метод set, а не 'future.cancel'. Таким образом, у вас будет больше контроля. Но я просто догадываюсь. – ajb
Выполнение того, что я предложил, означает, что вам, вероятно, придется написать именованный подкласс «Runnable» и объявить переменную этого класса, чтобы вы могли вызвать метод «set my cancel flag». – ajb