2010-10-14 6 views
50

Я не могу использовать shutdown() и awaitTermination(), потому что возможно, что новые задачи будут добавлены в ThreadPoolExecutor во время ожидания.Как дождаться завершения всех задач в ThreadPoolExecutor, не закрывая Executor?

Так что я ищу способ подождать, пока ThreadPoolExecutor освободит его очередь и завершит все его задачи, не останавливая новые задачи от добавления до этого момента.

Если это имеет значение, это для Android.

Благодаря

Update: Много недель спустя после пересмотра этого, я обнаружил, что модифицированный CountDownLatch работал лучше для меня в этом случае. Я сохраню ответ, потому что он больше относится к тому, что я спросил.

+0

Если вы в порядке с добавлением новых задач, что произойдет, если оно не закончится? – Kylar

+0

Я думаю, что littleFluffyKitty только хочет дождаться завершения «старых» задач. – Thilo

+2

Я не настолько обеспокоен возможностью, что он никогда не закончится, потому что, если это так, то что-то еще уже ужасно нарушено. Если все остальное не сработает, я смогу реализовать какое-то время, но я согласен с тем, что он закончит. Я хочу, чтобы он мог выполнять новые задачи, пока он ждет, или сказать это по-другому, я хочу, чтобы новые задачи могли быть добавлены после вызова wait. – cottonBallPaws

ответ

64

Если вам интересно узнать, когда какая-либо задача завершена или определенная партия задач, вы можете использовать ExecutorService.submit(Runnable). Вызов этого метода возвращает объект Future, который может быть помещен в Collection, который ваш основной поток будет перебирать по вызову Future.get() для каждого из них. Это приведет к остановке основного потока, пока ExecutorService не обработает все задачи Runnable.

Collection<Future<?>> futures = new LinkedList<Future<?>>(); 
futures.add(executorService.submit(myRunnable)); 
for (Future<?> future:futures) { 
    future.get(); 
} 
+2

+1 Это, кажется, лучший способ. Должно быть сделано на уровне приложения, однако, когда вы отправляете задачу, а не на уровне обслуживания исполнителя. – Thilo

+11

+1 или используйте функцию invokeAll() для пакетной задачи, чтобы дождаться завершения. См. Мой ответ здесь: http://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/3269888 – andersoj

+1

Правильно, если вы готовы выполнить свои задачи 'Callable' вместо «Runnable» решение, на которое ссылается @andersoj, намного проще. –

5

Может быть, вы ищете CompletionService управлять партиями задачи, смотрите также this answer.

+0

ваша ссылка кажется сломанной. – qwertzguy

+0

@qwertzguy: Заменено ламером, более стабильным. Благодарю. – Thilo

3

(Это попытка воспроизвести ранее, удаленный ответ Тило с собственными корректировками.)

Я думаю, что вам, возможно, потребуется уточнить ваш вопрос, поскольку существует неявное бесконечное состояние ... в какой-то момент у вас есть чтобы решить закрыть своего исполнителя, и в этот момент он больше не примет никаких задач. Ваш вопрос, кажется, подразумевает, что вы хотите подождать, пока не найдете , что дальнейшие задачи не будут отправлены, о которых вы можете узнать только в своем собственном коде приложения.

Следующий ответ позволит вам плавно перейти на новый TPE (по любой причине), выполнив все текущие задачи и не отказываясь от новых задач для нового TPE. Это может ответить на ваш вопрос. @ Тило может также.

Предполагая, что вы определили где видимый TPE используется как таковой:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...; 

Вы можете написать TPE подкачки рутину как таковой. Она также может быть написана с использованием синхронизированного метода, но я думаю, что это проще:

void rotateTPE() 
{ 
    ThreadPoolExecutor newTPE = createNewTPE(); 
    // atomic swap with publicly-visible TPE 
    ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE); 
    oldTPE.shutdown(); 

    // and if you want this method to block awaiting completion of old tasks in 
    // the previously visible TPE 
    oldTPE.awaitTermination(); 
} 

В качестве альтернативы, если вы действительно не шутите хотите, чтобы убить пул потоков, то ваша стороне податель нужна будет справиться с отклоненными задачами в какой-то момент, и вы можете использовать null для нового TPE:

void killTPE() 
{ 
    ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null); 
    oldTPE.shutdown(); 

    // and if you want this method to block awaiting completion of old tasks in 
    // the previously visible TPE 
    oldTPE.awaitTermination(); 
} 

что может вызвать проблемы вверх по течению, вызывающий должен был бы знать, что делать с null.

Вы также можете поменять местами с помощью фиктивного TPE, который просто отклонил каждое новое исполнение, но это эквивалентно тому, что происходит, если вы вызываете shutdown() на TPE.

+0

Спасибо, что нашли время, чтобы написать это, я посмотрю на все и посмотрю, какой маршрут работает лучше всего. – cottonBallPaws

7

My Scenario - это веб-искатель для получения некоторой информации с веб-сайта, а затем обработки. ThreadPoolExecutor используется для ускорения процесса, потому что во время загрузки может быть загружено много страниц. Таким образом, новые задачи будут созданы в существующей задаче, потому что искатель будет следовать за гиперссылками на каждой странице. Проблема такая же: основной поток не знает, когда все задачи завершены, и он может начать обрабатывать результат. Я использую простой способ определить это. Это не очень элегантно, но работает в моем случае:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){ 
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount()); 
    Thread.sleep(5000); 
} 
executor.shutdown(); 
executor.awaitTermination(60, TimeUnit.SECONDS); 
1

Если вы не хотите использовать shutdown, следуйте ниже подходов:

  1. перебрать все Future задач от подавать на ExecutorService и проверки статус с блокированием вызова get() на Future объекта как предложено Tim Bender

  2. Используйте один из

    1. Использование invokeAll на ExecutorService
    2. Использование CountDownLatch
    3. Использование ForkJoinPool или newWorkStealingPool из Executors (с Java 8)

invokeAll() по исполнителю службы также достигает той же цели CountDownLatch

Связанные SE вопрос:

How to wait for a number of threads to complete?

0

Вы могли бы назвать waitTillDone() на Runner класс:

Runner runner = Runner.runner(10); 

runner.runIn(2, SECONDS, runnable); 
runner.run(runnable); // each of this runnables could submit more tasks 

runner.waitTillDone(); // blocks until all tasks are finished (or failed) 

// and now reuse it 

runner.runIn(500, MILLISECONDS, callable); 

runner.waitTillDone(); 
runner.shutdown(); 

Чтобы использовать его добавить Gradle/Maven зависимости к проекту: 'com.github.matejtymes:javafixes:1.0'

Для получения более подробной информации смотрите здесь: https://github.com/MatejTymes/JavaFixes или здесь: http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html

 Смежные вопросы

  • Нет связанных вопросов^_^