Как известно, ThreadPoolExecutor использует некоторый BlockingQueue в качестве очереди входящих задач. Я хочу, чтобы у ThreadPoolExecutor была очередь для результатов задачи, которые были готовы. Я хочу использовать эту очередь в качестве источника для служб ввода/вывода, которые отправляют или сохраняют эти результаты.Параллельность: как реализовать исполнителя с входящими и исходящими очередями?
Почему я хочу создать отдельную очередь? Потому что я хочу отделить действие от отправки результатов от действия получения результатов. Кроме того, я полагаю, что любые Исключения и Задержки, которые сопровождают операции ввода-вывода, не должны влиять на мой ThreadPoolExecutor, который вычисляет результат.
Я создал некоторую наивную реализацию этого. Я хотел бы получить некоторую критику по этому поводу. Может быть, он может быть реализован с готовыми Java-классами лучше? Я использую Java 7.
public class ThreadPoolWithResultQueue {
interface Callback<T> {
void complete(T t);
}
public abstract static class CallbackTask<T> implements Runnable {
private final Callback callback;
CallbackTask(Callback callback) {
this.callback = callback;
}
public abstract T execute();
final public void run() {
T t = execute();
callback.complete(t);
}
}
public static class CallBackTaskString extends CallbackTask<String> {
public CallBackTaskString(Callback callback) {
super(callback);
}
@Override
public String execute() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
return hashCode() + "-" + System.currentTimeMillis();
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
final BlockingQueue<String> resultQueue = new LinkedBlockingQueue<String>();
Callback<String> addToQueueCallback = new Callback<String>() {
@Override
public void complete(String s) {
System.out.println("Adding Result To Queue " + s);
resultQueue.add(s); //adding to outgoing queue. some other executor (or same one?) will process it
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 1000l, TimeUnit.DAYS, workQueue);
for (int i = 0; i <= 5; i++) {
executor.submit(new CallBackTaskString(addToQueueCallback));
};
System.out.println("All submitted.");
executor.shutdown();
executor.awaitTermination(10l, TimeUnit.SECONDS);
System.out.println("Result queue size " + resultQueue.size());
}
}
Вы положили туда все лишние «обратные вызовы», поэтому код кажется более умным, или вам нужно/хотите сделать это именно так? Прямая реализация просто включает в себя еще одну очередь, и если вы знаете, как ставить вещи в очереди, это должно быть очевидно. – Kayaman
Одно слово: «Исполнитель». –
Kayaman, да его проще добавлять результаты в исходящую очередь прямо в Runnable для выполнения. Но я стараюсь как можно больше отделиться. Во всяком случае, Марко предложил отличную вещь. – MiamiBeach