Я нашел еще один вариант, чем тот, который был предложен компанией @Carlitos Way. Он состоит в непосредственном добавлении задач в очередь с использованием BlockingQueue.offer
. Единственная причина, по которой мне не удалось заставить ее работать сначала, и мне пришлось опубликовать этот вопрос, я не знал, что поведение по умолчанию ThreadPoolExecutor
начинается без какого-либо потока. Потоки будут создаваться лениво с использованием фабрики потоков и могут быть удалены/повторно заселены в зависимости от основного и максимального размеров пула и количества одновременно задаваемых задач.
С тех пор как создание потоков было ленивым, мои попытки заблокировать звонок по вызову offer
не удались, потому что SynchronousQueue.offer
сразу же выходит, если никто не ждет, чтобы получить элемент из очереди. И наоборот, SynchronousQueue.put
блокируется, пока кто-то не попросит взять элемент из очереди, чего не произойдет, если пул потоков пуст.
Таким образом, обходным путем является принудительный пул потоков для создания основных потоков с использованием ThreadPoolExecutor.prestartAllCoreThreads
. Моя проблема становится довольно тривиальной. Я сделал упрощенную версию моей реальной потребительной случае:
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
public class SimplifiedBuildScheduler {
private static final int MAX_POOL_SIZE = 10;
private static final Random random = new Random();
private static final AtomicLong nextTaskId = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<Runnable> queue = new SynchronousQueue<>();
// this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question.
long maxBuildTimeInMillis = 50;
// this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time
long taskSubmissionTimeoutInMillis = 1;
ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue);
pool.prestartAllCoreThreads();
Runnable nextTask = makeTask(maxBuildTimeInMillis);
long millisAtStart = System.currentTimeMillis();
while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) {
boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS);
if (submitted) {
nextTask = makeTask(maxBuildTimeInMillis);
} else {
System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " +
"the max build time has expired");
}
}
System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion");
pool.shutdown();
pool.awaitTermination(9999999, SECONDS);
}
private static Runnable makeTask(long maxBuildTimeInMillis) {
long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis);
long taskId = nextTaskId.getAndIncrement();
return() -> {
try {
System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms");
Thread.sleep(sleepTimeInMillis);
System.out.println("Task " + taskId + " completed !");
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
};
}
private static int randomSleepTime(long maxBuildTimeInMillis) {
// voluntarily make it possible that a task finishes after the max build time is expired
return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis));
}
}
Пример вывода является следующее:
Task 1 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 sleeping for 23 ms
Task 1 sleeping for 26 ms
Task 2 sleeping for 6 ms
Task 3 sleeping for 9 ms
Task 4 sleeping for 75 ms
Task 5 sleeping for 35 ms
Task 6 sleeping for 81 ms
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 8 was not submitted. It will be rescheduled unless the max build time has expired
Task 7 sleeping for 86 ms
Task 8 sleeping for 47 ms
Task 9 sleeping for 40 ms
Task 11 was not submitted. It will be rescheduled unless the max build time has expired
Task 2 completed !
Task 10 sleeping for 76 ms
Task 12 was not submitted. It will be rescheduled unless the max build time has expired
Task 3 completed !
Task 11 sleeping for 31 ms
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 13 was not submitted. It will be rescheduled unless the max build time has expired
Task 0 completed !
Task 12 sleeping for 7 ms
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 14 was not submitted. It will be rescheduled unless the max build time has expired
Task 1 completed !
Task 13 sleeping for 40 ms
Task 15 was not submitted. It will be rescheduled unless the max build time has expired
Task 12 completed !
Task 14 sleeping for 93 ms
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 16 was not submitted. It will be rescheduled unless the max build time has expired
Task 5 completed !
Task 15 sleeping for 20 ms
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 17 was not submitted. It will be rescheduled unless the max build time has expired
Task 11 completed !
Task 16 sleeping for 27 ms
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 18 was not submitted. It will be rescheduled unless the max build time has expired
Task 9 completed !
Task 17 sleeping for 95 ms
Task 19 was not submitted. It will be rescheduled unless the max build time has expired
Max build time has expired. Stop submitting new tasks and running existing tasks to completion
Task 8 completed !
Task 15 completed !
Task 13 completed !
Task 16 completed !
Task 4 completed !
Task 6 completed !
Task 10 completed !
Task 7 completed !
Task 14 completed !
Task 17 completed !
Вы заметите, например, что задача 19 не было перенесено, поскольку Максимальное время сборки истекло, прежде чем планировщик может попытаться предложить его в очередь во второй раз. Вы также можете увидеть, чем все текущие задачи, которые были запущены до истечения максимального времени сборки, выполняются до завершения.
Примечание: Как отмечались в моих комментариях в коде, максимальное время сборки является мягкого требования, которое означает, что оно не может быть выполнено точно, и мое решение действительно позволяет задача представляется даже после истечения максимального времени сборки. Это может произойти, если вызов offer
начинается непосредственно до истечения максимального времени сборки и заканчивается после. Чтобы уменьшить вероятность этого, важно, чтобы таймаут, используемый при вызове offer
, намного меньше максимального времени сборки. В реальной системе пул потоков обычно занят без холостого потока, поэтому вероятность возникновения этого состояния гонки крайне мала, и это не имеет плохих последствий для системы, когда это происходит, поскольку максимальное время сборки является наилучшие усилия пытаются удовлетворить общее время работы, а не точное и жесткое ограничение.
Да, я думаю, что это сработает, используя сон немного грязно, но это просто, и я предпочитаю, чтобы сложная дополнительная логика.Дело в том, что мне удастся получить это изменение, если я смогу исправить случай редкого края, который мы обнаруживаем с помощью текущего подхода (планируем все сразу + отменить все оставшиеся задачи после таймаута) с более сложной задачей. Спасибо за ваше решение, чтобы понять, как он соответствует моему коду. – Dici
Я столкнулся с одной проблемой с этим подходом: «Runnable», переданный как параметр, не совсем то же самое, что и тот, который был отправлен. Вместо этого он завернут в «FutureTask», который имеет некоторую логику обработки исключений, которая мешает моей работе работать. Вы не могли знать этого в контексте, заданном в моем вопросе, но я добавляю это как предупреждение для будущих читателей. Я могу решить эту проблему, сильно полагаясь на поведение «SynchronousQueue». К сожалению, я не могу передать весь код. Я скажу, дойду ли я до рабочего решения. – Dici
Мне удалось заставить его работать. Это немного грязно, но я рад, что смог удалить 25% существующего кода, улучшив поведение (хотя я потерял одну небольшую функцию об обработке ошибок). Мне кажется, мне все это нравится: p. Я приму этот ответ. – Dici