2016-09-22 10 views
4

Как известно, ThreadPoolExecutor использует некоторый BlockingQueue в качестве очереди входящих задач. Я хочу, чтобы у ThreadPoolExecutor была очередь для результатов задачи, которые были готовы. Я хочу использовать эту очередь в качестве источника для служб ввода/вывода, которые отправляют или сохраняют эти результаты.Параллельность: как реализовать исполнителя с входящими и исходящими очередями?

Почему я хочу создать отдельную очередь? Потому что я хочу отделить действие от отправки результатов от действия получения результатов. Кроме того, я полагаю, что любые Исключения и Задержки, которые сопровождают операции ввода-вывода, не должны влиять на мой ThreadPoolExecutor, который вычисляет результат.

Я создал некоторую наивную реализацию этого. Я хотел бы получить некоторую критику по этому поводу. Может быть, он может быть реализован с готовыми Java-классами лучше? Я использую Java 7.

public class ThreadPoolWithResultQueue { 
    interface Callback<T> { 
     void complete(T t); 
    } 
    public abstract static class CallbackTask<T> implements Runnable { 
     private final Callback callback; 
     CallbackTask(Callback callback) { 
      this.callback = callback; 
     }  
     public abstract T execute(); 
     final public void run() { 
      T t = execute(); 
      callback.complete(t); 
     } 
    } 
    public static class CallBackTaskString extends CallbackTask<String> { 
     public CallBackTaskString(Callback callback) { 
      super(callback); 
     } 
     @Override 
     public String execute() { 
      try { 
       Thread.sleep(3000); 
      } catch (InterruptedException e) { 
      } 
      return hashCode() + "-" + System.currentTimeMillis(); 
     } 
    }  
    public static void main(String[] args) throws InterruptedException { 
     BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); 
     final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>(); 
     Callback<String> addToQueueCallback = new Callback<String>() { 
      @Override 
      public void complete(String s) { 
       System.out.println("Adding Result To Queue " + s); 
       resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it 
      } 
     }; 
     ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue); 
     for (int i = 0; i <= 5; i++) { 
      executor.submit(new CallBackTaskString(addToQueueCallback)); 
     }; 
     System.out.println("All submitted."); 
     executor.shutdown(); 
     executor.awaitTermination(10l, TimeUnit.SECONDS); 
     System.out.println("Result queue size " + resultQueue.size()); 
    } 
} 
+0

Вы положили туда все лишние «обратные вызовы», поэтому код кажется более умным, или вам нужно/хотите сделать это именно так? Прямая реализация просто включает в себя еще одну очередь, и если вы знаете, как ставить вещи в очереди, это должно быть очевидно. – Kayaman

+4

Одно слово: «Исполнитель». –

+0

Kayaman, да его проще добавлять результаты в исходящую очередь прямо в Runnable для выполнения. Но я стараюсь как можно больше отделиться. Во всяком случае, Марко предложил отличную вещь. – MiamiBeach

ответ

0

Ради makinf компонент библиотеки, вы должны обернуть вещи ...

Вы можете расширить ИСПОЛНИТЕЛЬ пул потоков, который имеет целый ряд методов перехвата отправленные задачи, поэтому вы ставите очередь в очередь, переданную в конструкторе.

Это в основном ExecutorCompletionService, но вы позволите пользователю подключить очередь вместо того, чтобы появляться как один.

В противном случае это типичное проксирование задачи. Справедливая работа.