2012-03-28 4 views
9

Я занимался различными стратегиями для объединения потоков с помощью ThreadPoolExecutor с JDK6. У меня есть очередь приоритетов, но не была уверена, понравилось ли мне, как пул не изменился после keepAliveTime (что вы получаете с неограниченной очередью). Итак, я смотрю на ThreadPoolExecutor, используя политику LinkedBlockingQueue и CallerRuns.Почему ThreadPoolExecutor сокращает потоки ниже corePoolSize после keepAliveTime?

Проблема, с которой я столкнулся сейчас, заключается в том, что бассейн растет, так как документы объясняют, что это необходимо, но после завершения задач и включения keepAliveTime getPoolSize показывает, что пул становится сведенным к нулю. Пример ниже должен позволить вам увидеть основу для моего вопроса:

public class ThreadPoolingDemo { 
    private final static Logger LOGGER = 
     Logger.getLogger(ThreadPoolingDemo.class.getName()); 

    public static void main(String[] args) throws Exception { 
     LOGGER.info("MAIN THREAD:starting"); 
     runCallerTestPlain(); 
    } 

    private static void runCallerTestPlain() throws InterruptedException { 
     //10 core threads, 
     //50 max pool size, 
     //100 tasks in queue, 
     //at max pool and full queue - caller runs task 
     ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 50, 
      5L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(100), 
      new ThreadPoolExecutor.CallerRunsPolicy()); 

     //dump 5000 tasks on the queue 
     for (int i = 0; i < 5000; i++) { 
      tpe.submit(new Runnable() { 
       @Override 
       public void run() { 
        //just to eat some time and give a little feedback 
        for (int j = 0; j < 20; j++) { 
         LOGGER.info("First-batch Task, looping:" + j + "[" 
           + Thread.currentThread().getId() + "]"); 
        } 
       } 
      }, null); 
     } 
     LOGGER.info("MAIN THREAD:!!Done queueing!!"); 

     //check tpe statistics forever 
     while (true) { 
      LOGGER.info("Active count: " + tpe.getActiveCount() + " Pool size: " 
       + tpe.getPoolSize() + " Largest Pool: " + tpe.getLargestPoolSize()); 
      Thread.sleep(1000); 
     } 
    } 
} 

Я нашел старую ошибку, которая, кажется, этот вопрос, но он был закрыт: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6458662. Может ли это по-прежнему присутствовать в версии 1.6 или я что-то упускаю?

Похоже, что я резинка уклонилась от этого (http://www.codinghorror.com/blog/2012/03/rubber-duck-problem-solving.html). Ошибка, связанная выше, связана с этим: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6576792, где проблема, кажется, решена в 1,7 (я загрузил 1,7 и проверен - исправлен ...). На мой взгляд, моя главная проблема заключалась в том, что ошибка эта фундаментальная продолжалась почти десятилетие. Я потратил слишком много времени, написав это, чтобы не публиковать его сейчас, надеюсь, что это поможет кому-то.

+0

+1 Приятная находка, был удивлен, увидев это поведение. –

+1

Возможно, было бы лучше структурировать ваше сообщение как вопрос, а затем предоставить то, что вы узнали, как ответ? –

ответ

6

... после выполнения задач и продолжения keepAliveTime getPoolSize показывает, что пул уменьшается до нуля.

Так что это выглядит как условие гонки в ThreadPoolExecutor. Я предполагаю, что он работает по дизайну, хотя и не ожидается. В getTask() методе, который петля рабочих потоков через, чтобы получить задания из очереди блокировки, вы видите этот код:

if (state == SHUTDOWN) // Help drain queue 
    r = workQueue.poll(); 
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); 
else 
    r = workQueue.take(); 
if (r != null) 
    return r; 
if (workerCanExit()) { 
    if (runState >= SHUTDOWN) // Wake up others 
     interruptIdleWorkers(); 
    return null; 
} 

Если poolSize растет над corePoolSize тогда, если время опроса из после keepAliveTime, код падает до workerCanExit() с r является null. Все из потоков может вернуться true из этого метода, так как он просто проверяет состояние из poolSize:

mainLock.lock(); 
    boolean canExit; 
    try { 
     canExit = runState >= STOP || 
      workQueue.isEmpty() || 
      (allowCoreThreadTimeOut && 
      poolSize > Math.max(1, corePoolSize)); << test poolSize here 
    } finally { 
     mainLock.unlock();       << race to workerDone() begins 
    } 

После того, что возвращает true затем выходы потоков рабочих и затемpoolSize декрементируется. Если все рабочие потоки выполняют этот тест в одно и то же время, то все они выйдут из-за гонки между тестированием poolSize и остановкой рабочего при возникновении --poolSize.

Что меня удивляет, насколько согласуется это состояние гонки. Если вы добавите некоторую рандомизацию в sleep() внутри run() ниже, вы можете получить некоторые основные потоки, чтобы не выйти, но я бы подумал, что состояние гонки было бы сложнее.


Вы можете увидеть это поведение в следующем тесте:

@Test 
public void test() throws Exception { 
    int before = Thread.activeCount(); 
    int core = 10; 
    int max = 50; 
    int queueSize = 100; 
    ThreadPoolExecutor tpe = 
      new ThreadPoolExecutor(core, max, 1L, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(queueSize), 
        new ThreadPoolExecutor.CallerRunsPolicy()); 
    tpe.allowCoreThreadTimeOut(false); 
    assertEquals(0, tpe.getActiveCount()); 
    // if we start 1 more than can go into core or queue, poolSize goes to 0 
    int startN = core + queueSize + 1; 
    // if we only start jobs the core can take care of, then it won't go to 0 
    // int startN = core + queueSize; 
    for (int i = 0; i < startN; i++) { 
     tpe.submit(new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
     }); 
    } 
    while (true) { 
     System.out.println("active = " + tpe.getActiveCount() + ", poolSize = " + tpe.getPoolSize() 
       + ", largest = " + tpe.getLargestPoolSize() + ", threads = " + (Thread.activeCount() - before)); 
     Thread.sleep(1000); 
    } 
} 

Если изменить sleep линию внутри метода run() к чему-то вроде этого:

private final Random random = new Random(); 
... 
    Thread.sleep(100 + random.nextInt(100)); 

Это сделает состояние гонки сложнее, так что некоторые основные потоки все равно будут вокруг.

+0

OP больше интересуется 'getPoolSize()', а не 'getActiveCount()' После завершения всех задач и окончания keepAliveTime все потоки фактически завершаются, хотя TPE этого не делает. –

+0

@ Мои параметры TPE важны. Это не всегда терпит неудачу. Используя используемые параметры, мы увидим, что он работает должным образом. Параметры, которые я задал, играют с правилами TPE уникальным, но также реалистичным способом (т. Е. Огромное пакетное задание сбрасывается в очередь). Основная проблема с вашими параметрами заключается в том, что вы никогда не запускаете ситуацию, которая заставляет потоки добавлять в пул выше и выше основного размера. Чтобы превысить/добавить размер ядра, пул должен быть размером ядра (10), а очередь должна пройти за его пределы (100). Попробуйте свой пример с 'for (int i = 0; i andematt

+0

@andematt Интересное путешествие. Это состояние гонки в «ThreadPoolExecutor». Я отредактировал свой ответ. – Gray