2015-05-27 2 views
0

У меня есть пул потоков исполнителяThreadPoolExecutor работает только первый работоспособной

private ExecutorService exec=Executors.newCachedThreadPool(); 

и у меня есть класс

class MyCallable implements Callable<String>{ 

    private ReentrantLock lock=new ReentrantLock(); 

    private Condition cond=lock.newCondition(); 


    public String call(){ 

    System.out.println("TEST!!); 
    try{ 
     lock.lock(); 
     cond.await(); 
    }finally{ 
     lock.unlock(); 
    } 
    } 
} 

и я:

for(int i=0;i<10000;i++){ 
    exec.submit(new MyCallable()); 
} 

Я хочу иметь 10000 темы с ожидающими вызовами, но я вижу только один ТЕСТ !! в моих журналах, поэтому он отправляет мою задачу только один раз, почему она зависает ??? Мой вызываемый имеет свой собственный замок в каждом экземпляре объекта, как иметь 1000 ожидающих потоков?

+0

Как долго вы продолжаете ждать окончания цикла 'for (...)' перед завершением программы? – JimmyB

+0

** [Исполнители # newFixedThreadPool, вероятно, помогут вам в том, чего вы хотите достичь] (http://stackoverflow.com/a/949463/3143670) ** –

+0

Спасибо, но я не могу использовать фиксированный пул потоков, я не знаю 't знаю количество вызывающих, 10k просто пример @Hanno, у меня есть только этот цикл, я даже не использую future.get() – avalon

ответ

1

Я взял ваш код, заполнил недостающие биты, исправил ошибки компиляции и запустил его. Он печатает «TEST !!» много времени. Если я уменьшу количество заданий до разумного числа, оно будет завершено. Если я оставлю его на уровне 10 000, тест завершится неудачно с помощью OOME, говорящего, что он не может выделить больше стеков потоков.


Я хочу иметь 10000 темы с ждущих

вызываемых объектов

Мои испытания показывают, что, что много потоков собирается причинить вам бежать из памяти. Это действительно не очень хорошая идея.


Вот точный код, который я использовал:

package test; 

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 

public class ExecutorTest { 

    public static void main(String[] args) { 
     ExecutorService exec = Executors.newCachedThreadPool(); 
     for (int i = 0; i < 10000; i++) { 
      exec.submit(new MyCallable()); 
     } 
    } 
} 


package test; 

import java.util.concurrent.Callable; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.ReentrantLock; 

class MyCallable implements Callable<String> { 

    private ReentrantLock lock = new ReentrantLock(); 

    private Condition cond = lock.newCondition(); 

    public String call() { 

     System.out.println("TEST!!"); 
     try { 
      lock.lock(); 
      cond.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } finally { 
      lock.unlock(); 
     } 
     return "foo"; 
    } 
} 

Вот тест while(true) {await}:

[[email protected] ~]$ cat ./workspace/test/src/test/CondAwait.java 
package test; 

import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.ReentrantLock; 

public class CondAwait { 

    public static void main(String[] args) throws InterruptedException { 
     ReentrantLock lock = new ReentrantLock(); 

     Condition cond = lock.newCondition(); 
     lock.lock(); 
     while(true){ 
      System.out.println("Hi!"); 
      cond.await(); 
     } 
    } 

} 
[[email protected] ~]$ java -cp ./workspace/test/bin test.CondAwait 
Hi! 
^C[[email protected] ~]$ 

Короче говоря, основной поток замерзает в первой await вызова. .. как я и ожидал.


Я согласен, что `ы не очень хорошая идея, но эти потоки будут как кэш запроса, пользователь отправляет запрос, то процессор должен быть помещен в пул потоков, если мы имеем 10k запросов в секунду вы увидите 10k потоков в пуле. Пожалуйста, предложите, как это сделать лучше.

Используйте пул потоков ограничен и установить обязательно будет примерно равно количеству ядер, которые у вас есть.

Несомненно, у вас не будет 10 000 вызовов, которые будут приостановлены одновременно, но это ХОРОШЕЕ.

Если эти вызывающие вызовы предназначены для имитации запросов, которые ждут несколько секунд для чего-то внешнего ответа, тогда пул потоков до 100 может быть разумным. Но если вы действительно нуждаетесь в массивном параллелизме, вам следует взглянуть на то, что использует селектор NIO, чтобы небольшое число рабочих потоков могло запускать большое количество запросов, чередуя их, а не блокируя ввод-вывод.

+0

Спасибо, это действительно странно, попробуйте запустить (правда) {cond.await();} Спасибо – avalon

+0

Спасибо, я согласен, что это не очень хорошая идея, но эти потоки будут похожи на кеш запросов , пользователь отправляет запрос, его процессор должен быть помещен в пул потоков, если у нас есть 10 тыс. запросов в секунду, вы увидите 10k потоков в пуле. Пожалуйста, предложите, как это сделать лучше. Спасибо – avalon

+0

Спасибо, но почему мне нужно удерживать блокировку, если раньше я вызывал lock.lock(), и это не объясняет, почему я не могу видеть утверждения раньше ожидания. – avalon

0

Посмотрите на класс java.util.concurrent.CyclicBarrier.Как указано в JavaDoc это:

Слуховой синхронизации, которая позволяет набор нитей все ждут друг друга, чтобы достичь общей точки барьера.

Каждый из ваших потоков должен называть метод await барьера. Это означает, что нить приостановлена ​​до тех пор, пока барьер не достигнет ожидаемого количества потоков или не произойдет тайм-аут.

0

Я просто попробовал вас программу, и она отлично работает для меня. Я видел несколько запущенных задач.

public class MyCallable implements Callable<String> { 

    private ReentrantLock lock = new ReentrantLock(); 
    private Condition cond = lock.newCondition(); 

    public String call() { 
     System.out.println("TEST!!"); 
     try { 
      lock.lock(); 
      cond.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } finally { 
      lock.unlock(); 
     } 
     return "done"; 
    } 

    public static void main(String[] args) { 
     ExecutorService service = Executors.newCachedThreadPool(); 
     for (int i = 0; i < 10; i++) { 
      service.submit(new MyCallable()); 
     } 
    } 
} 

И я бы предложил 10000 одновременных задач - это не очень хорошая идея. Сколько у вас процессоров? ;)

+0

Спасибо, это не имеет значения для меня, так как мне нужно хранить какие-то потоки, которые будут ждать ответа на rabbitmq, как я могу это сделать? Понятия не имею. – avalon