2017-01-19 3 views
2

я следующая проблема,Как распределить задачи между потоками в ThreadPoolExecutor

У меня есть очереди задач и есть много типов задач, таких как:

A, B, C, D, ... 

я исполняю эти задачи в резьбе бассейн.

Но я должен ограничивать же выполнение задания типа одновременно, следовательно, это плохо:

Thread-1: [A, D, C, B, ...] 
Thread-2: [A, C, D, B, ...] 

Задачи типа А и В могут быть выполнены одновременно.

Но это хорошо:

Thread-1: [A,B,A,B,...] 
Thread-2: [C,D,D,C,...] 

Поэтому задачи одного типа всегда выполняются последовательно.

Что такое самый простой способ реализовать эту функциональность?

+0

Ваш вопрос довольно расплывчатый, не уверен, что я это понимаю. Но вы можете рассмотреть возможность использования только двух отдельных пулов потоков. – Jamie

+0

@Jamie это очень неуклюжие решения. – corvax

+0

Было бы также хорошо? Thread-1 [A1, A2, B1, B2]? – john16384

ответ

0
CompletableFuture.supplyAsync(this::doTaskA) 
       .thenAccept(this::useResultFromTaskAinTaskB); 

То, что происходит выше, что задачи А и связанные задачи B фактически выполняются в том же потоке (один за другим, нет необходимости, чтобы «получить» новый поток для запуска Задачу B).

Или вы можете использовать runAsync для задач А, если вам не нужен какой-либо информации от него, но не нужно ждать его завершения перед запуском задач B.

По умолчанию CompletableFuture будет использовать общую нить пул, но если вы хотите получить больше контроля над тем, какой ThreadPool будет использоваться, вы можете передать второй аргумент методам async со своим собственным Исполнителем, который использует ваш собственный ThreadPool.

1

Эта проблема легко может быть решена с помощью актерской структуры, такой как Akka.

Для каждого типа задач. создать актера.

Для каждой отдельной задачи создайте сообщение и отправьте его актеру соответствующего типа. Сообщения могут быть типа Runnable, так как они, вероятно, в настоящее время, и способ реакции актера может быть @Override public void onReceive(Object msg) { ((Runnable)msg).run(); }

Таким образом, ваша программа будет работать правильно для любого числа потоков.

+0

Да, это может быть легко решено Akka, но я думаю, что это большая накладная задача для интеграции akka для решения только этой проблемы. – corvax

1

Я думаю, что вы можете реализовать свой собственный DistributedThreadPool для управления потоком. Это похоже на какую-то тему подписчика/издателя.

Я сделал пример следующим образом:

class DistributeThreadPool { 

Map<String, TypeThread> TypeCenter = new HashMap<String, TypeThread>(); 

public void execute(Worker command) { 
    TypeCenter.get(command.type).accept(command); 
} 

class TypeThread implements Runnable{ 

    Thread t = null; 
    LinkedBlockingDeque<Runnable> lbq = null; 

    public TypeThread() { 
     lbq = new LinkedBlockingDeque<Runnable>(); 
    } 

    public void accept(Runnable inRun) { 
     lbq.add(inRun); 
    } 

    public void start() { 
     t = new Thread(this); 
     t.start(); 
    } 


    @Override 
    public void run() { 
     while (!Thread.interrupted()) { 
      try { 
       lbq.take().run(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public DistributeThreadPool(String[] Types) { 
    for (String t : Types) { 
     TypeThread thread = new TypeThread(); 
     TypeCenter.put(t, thread); 
     thread.start(); 
    } 
} 

public static void main(String [] args) { 
     DistributeThreadPool dtp = new DistributeThreadPool(new String[] {"AB","CD"}); 

     Worker w1 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w2 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w3 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w4 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w5 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 

     List<Worker> workers = new ArrayList<Worker>(); 
     workers.add(w1); 
     workers.add(w2); 
     workers.add(w3); 
     workers.add(w4); 
     workers.add(w5); 

     workers.forEach(e->dtp.execute(e)); 
    } 
} 
+0

Что делать, если есть 1000 задач? – Anton

+0

Вы хотите, чтобы вы одновременно запускали 1000 задач? @Anton – Chuck

0

Интересная проблема. Приходят на ум два вопроса:

Сколько существует различных типов задач?

Если относительно мало, самым простым способом может быть создание одного потока для каждого типа и назначение каждой входящей задачи в виде потока. Пока задачи сбалансированы между типами (и это большое предположение), использование будет достаточно хорошим.

Какова ожидаемая своевременность/латентность для завершения задачи?

Если ваша проблема гибка на своевременности, вы можете партия входящие задачи каждого вида по количеству или интервалу времени, представить каждую партию вы уединиться в бассейн, а затем ждать завершение партии представить другие тот же вид ,

Вы можете адаптировать вторую альтернативу к размерам партии как одну, и в этом случае механика ожидающего завершения важна для эффективности. CompletableFuture будет соответствовать счету здесь; вы можете связать «опрос следующей задачи типа A и отправить в пул» действие к задаче с thenRunAsync и запустить и забыть задачу.

Вам нужно будет поддерживать одну внешнюю очередь задач для каждого типа задачи; рабочие очереди пула FJ будут выполняться только для выполнения задач. Тем не менее, этот проект имеет хорошие шансы разумно справиться с дисбалансом в подсчете задач и рабочей нагрузке для каждого типа.

Надеюсь, это поможет.

0

Реализация ключа, заказанного исполнителя. Каждая задача должна иметь ключ. Задачи с одинаковыми ключами будут поставлены в очередь и будут выполняться последовательно, задачи с разными ключами будут выполняться параллельно.

Implementation in netty

Вы можете попробовать сделать это самостоятельно, но это сложно и чревато ошибками. Я вижу несколько ошибок в ответе, предложенных там.