2

Я пытаюсь найти способ использовать ThreadPoolExecutor по следующему сценарию:Синхронный производитель задачи/потребитель, используя ThreadPoolExecutor

  • У меня есть отдельный поток производства и подачи задач на пуле потоков
  • представление задачи является синхронным и будет блокироваться до тех пор, пока задача не может быть запущена ThreadPoolExecutor
  • в любой момент времени может выполняться параллельно только определенное количество задач. Неограниченное количество задач, выполняемых одновременно, может привести к исчерпанию памяти.
  • перед отправкой задачи нить производителя всегда проверяет, что с момента первой поставленной задачи не было превышено максимальное время сборки. Если он был превышен, поток отключается, но любая задача, выполняемая в настоящее время в пуле потоков, заканчивается до завершения приложения.
  • , когда прекращается нить производителя, в очереди пула потоков не должно быть нераспределенной задачи.

Чтобы предоставить больше контекста, я в настоящее время просто отправлю все задачи сразу и отменил все фьючерсы, возвращенные ExecutorService.submit после истечения максимального времени сборки. Я игнорирую все полученные CancellationException s, так как они ожидаются. Проблема заключается в том, что поведение Future.cancel(false) нечетно и неадекватное мои потребительных дела:

  • это предотвращает Не начата задаче запустить (хорошо)
  • не прерывать запущенные задачи и позволить им бежать к завершению (хорошо)
  • однако он игнорирует любые исключения, вызванные текущими задачами, и вместо этого выбрасывает CancellationException, для которых Exception.getCause() - null. Поэтому я не могу отличить задачу, которая была отменена до запуска из задачи, которая продолжала работать после максимального времени сборки и завершилась неудачей с исключением! Это несчастливо, потому что в этом случае я хотел бы распространять исключение и сообщать об этом в некоторый механизм обработки ошибок.

Я рассмотрел различные блокирующие очереди Java, которые может предложить и нашел это: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html. Это казалось идеальным на первый, но, глядя на https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html, это, кажется, не играть с ThreadPoolExecutor так, как я хочу его:

Прямые передачи обслуживания. Хорошим выбором по умолчанию для рабочей очереди является SynchronousQueue, который снимает задачи с потоками, иначе их удерживает. Здесь попытка очереди на выполнение задачи завершится неудачей, если потоки не будут доступны для ее запуска, поэтому будет создан новый поток . Эта политика позволяет избежать блокировок при обработке наборов запросов , которые могут иметь внутренние зависимости. Прямые передачи обслуживания, как правило, требуют неограниченных максимальных значений для предотвращения отклонения новых заданных задач . Это, в свою очередь, допускает возможность неограниченного роста потоков , когда команды продолжают поступать в среднем быстрее, чем они могут быть обработаны .

Что было бы идеальным является то, что потребитель (= пул) блоков на SynchronousQueue.poll и продюсер (= задача производителя ниток) блоков на SynchronousQueue.put.

Любая идея, как я могу реализовать описанный мной сценарий без написания какой-либо сложной логики планирования (что должно быть для меня заключено ThreadPoolExecutor)?

ответ

0

Я нашел еще один вариант, чем тот, который был предложен компанией @Carlitos Way. Он состоит в непосредственном добавлении задач в очередь с использованием BlockingQueue.offer. Единственная причина, по которой мне не удалось заставить ее работать сначала, и мне пришлось опубликовать этот вопрос, я не знал, что поведение по умолчанию ThreadPoolExecutor начинается без какого-либо потока. Потоки будут создаваться лениво с использованием фабрики потоков и могут быть удалены/повторно заселены в зависимости от основного и максимального размеров пула и количества одновременно задаваемых задач.

С тех пор как создание потоков было ленивым, мои попытки заблокировать звонок по вызову offer не удались, потому что SynchronousQueue.offer сразу же выходит, если никто не ждет, чтобы получить элемент из очереди. И наоборот, SynchronousQueue.put блокируется, пока кто-то не попросит взять элемент из очереди, чего не произойдет, если пул потоков пуст.

Таким образом, обходным путем является принудительный пул потоков для создания основных потоков с использованием ThreadPoolExecutor.prestartAllCoreThreads. Моя проблема становится довольно тривиальной. Я сделал упрощенную версию моей реальной потребительной случае:

import java.util.Random; 
import java.util.concurrent.SynchronousQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.atomic.AtomicLong; 

import static java.util.concurrent.TimeUnit.MILLISECONDS; 
import static java.util.concurrent.TimeUnit.SECONDS; 

public class SimplifiedBuildScheduler { 
    private static final int MAX_POOL_SIZE = 10; 

    private static final Random random = new Random(); 
    private static final AtomicLong nextTaskId = new AtomicLong(0); 

    public static void main(String[] args) throws InterruptedException { 
     SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 

     // this is a soft requirement in my system, not a real-time guarantee. See the complete semantics in my question. 
     long maxBuildTimeInMillis = 50; 
     // this timeout must be small compared to maxBuildTimeInMillis in order to accurately match the maximum build time 
     long taskSubmissionTimeoutInMillis = 1; 

     ThreadPoolExecutor pool = new ThreadPoolExecutor(MAX_POOL_SIZE, MAX_POOL_SIZE, 0, SECONDS, queue); 
     pool.prestartAllCoreThreads(); 

     Runnable nextTask = makeTask(maxBuildTimeInMillis); 

     long millisAtStart = System.currentTimeMillis(); 
     while (maxBuildTimeInMillis > System.currentTimeMillis() - millisAtStart) { 
      boolean submitted = queue.offer(nextTask, taskSubmissionTimeoutInMillis, MILLISECONDS); 
      if (submitted) { 
       nextTask = makeTask(maxBuildTimeInMillis); 
      } else { 
       System.out.println("Task " + nextTaskId.get() + " was not submitted. " + "It will be rescheduled unless " + 
         "the max build time has expired"); 
      } 
     } 

     System.out.println("Max build time has expired. Stop submitting new tasks and running existing tasks to completion"); 

     pool.shutdown(); 
     pool.awaitTermination(9999999, SECONDS); 
    } 

    private static Runnable makeTask(long maxBuildTimeInMillis) { 
     long sleepTimeInMillis = randomSleepTime(maxBuildTimeInMillis); 
     long taskId = nextTaskId.getAndIncrement(); 
     return() -> { 
      try { 
       System.out.println("Task " + taskId + " sleeping for " + sleepTimeInMillis + " ms"); 
       Thread.sleep(sleepTimeInMillis); 
       System.out.println("Task " + taskId + " completed !"); 
      } catch (InterruptedException ex) { 
       throw new RuntimeException(ex); 
      } 
     }; 
    } 

    private static int randomSleepTime(long maxBuildTimeInMillis) { 
     // voluntarily make it possible that a task finishes after the max build time is expired 
     return 1 + random.nextInt(2 * Math.toIntExact(maxBuildTimeInMillis)); 
    } 
} 

Пример вывода является следующее:

Task 1 was not submitted. It will be rescheduled unless the max build time has expired 
Task 0 sleeping for 23 ms 
Task 1 sleeping for 26 ms 
Task 2 sleeping for 6 ms 
Task 3 sleeping for 9 ms 
Task 4 sleeping for 75 ms 
Task 5 sleeping for 35 ms 
Task 6 sleeping for 81 ms 
Task 8 was not submitted. It will be rescheduled unless the max build time has expired 
Task 8 was not submitted. It will be rescheduled unless the max build time has expired 
Task 7 sleeping for 86 ms 
Task 8 sleeping for 47 ms 
Task 9 sleeping for 40 ms 
Task 11 was not submitted. It will be rescheduled unless the max build time has expired 
Task 2 completed ! 
Task 10 sleeping for 76 ms 
Task 12 was not submitted. It will be rescheduled unless the max build time has expired 
Task 3 completed ! 
Task 11 sleeping for 31 ms 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 13 was not submitted. It will be rescheduled unless the max build time has expired 
Task 0 completed ! 
Task 12 sleeping for 7 ms 
Task 14 was not submitted. It will be rescheduled unless the max build time has expired 
Task 14 was not submitted. It will be rescheduled unless the max build time has expired 
Task 1 completed ! 
Task 13 sleeping for 40 ms 
Task 15 was not submitted. It will be rescheduled unless the max build time has expired 
Task 12 completed ! 
Task 14 sleeping for 93 ms 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 16 was not submitted. It will be rescheduled unless the max build time has expired 
Task 5 completed ! 
Task 15 sleeping for 20 ms 
Task 17 was not submitted. It will be rescheduled unless the max build time has expired 
Task 17 was not submitted. It will be rescheduled unless the max build time has expired 
Task 11 completed ! 
Task 16 sleeping for 27 ms 
Task 18 was not submitted. It will be rescheduled unless the max build time has expired 
Task 18 was not submitted. It will be rescheduled unless the max build time has expired 
Task 9 completed ! 
Task 17 sleeping for 95 ms 
Task 19 was not submitted. It will be rescheduled unless the max build time has expired 
Max build time has expired. Stop submitting new tasks and running existing tasks to completion 
Task 8 completed ! 
Task 15 completed ! 
Task 13 completed ! 
Task 16 completed ! 
Task 4 completed ! 
Task 6 completed ! 
Task 10 completed ! 
Task 7 completed ! 
Task 14 completed ! 
Task 17 completed ! 

Вы заметите, например, что задача 19 не было перенесено, поскольку Максимальное время сборки истекло, прежде чем планировщик может попытаться предложить его в очередь во второй раз. Вы также можете увидеть, чем все текущие задачи, которые были запущены до истечения максимального времени сборки, выполняются до завершения.

Примечание: Как отмечались в моих комментариях в коде, максимальное время сборки является мягкого требования, которое означает, что оно не может быть выполнено точно, и мое решение действительно позволяет задача представляется даже после истечения максимального времени сборки. Это может произойти, если вызов offer начинается непосредственно до истечения максимального времени сборки и заканчивается после. Чтобы уменьшить вероятность этого, важно, чтобы таймаут, используемый при вызове offer, намного меньше максимального времени сборки. В реальной системе пул потоков обычно занят без холостого потока, поэтому вероятность возникновения этого состояния гонки крайне мала, и это не имеет плохих последствий для системы, когда это происходит, поскольку максимальное время сборки является наилучшие усилия пытаются удовлетворить общее время работы, а не точное и жесткое ограничение.

1

Я верю, что вы на правильном пути ... все, что вам нужно сделать, это использовать SynchronousQueue в сочетании с RejectedExecutionHandler, используя следующие constructor ... таким образом вы можете определить фиксированную максимальную длину потока бассейн (ограничивая использование ресурсов) и определить резервный механизм для повторного планирования тех задач, которые не могут быть обработаны (потому что бассейн был полон) ... Пример:

public class Experiment { 

    public static final long HANDLER_SLEEP_TIME = 4000; 
    public static final int MAX_POOL_SIZE = 1; 

    public static void main(String[] args) throws InterruptedException { 
     SynchronousQueue<Runnable> queue; 
     RejectedExecutionHandler handler; 
     ThreadPoolExecutor pool; 
     Runnable runA, runB; 

     queue = new SynchronousQueue<>(); 
     handler = new RejectedExecutionHandler() { 
      @Override 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
       try { 
        System.out.println("Handler invoked! Thread: " + Thread.currentThread().getName()); 
        Thread.sleep(HANDLER_SLEEP_TIME); // this let runnableA finish 
        executor.submit(r); // re schedule 

       } catch (InterruptedException ex) { 
        throw new RuntimeException("Handler Exception!", ex); 
       } 
      } 
     }; 

     pool = new ThreadPoolExecutor(1, MAX_POOL_SIZE, 10, TimeUnit.SECONDS, queue, handler); 
     runA = new Runnable() { 
      @Override 
      public void run() { 
       try { 
        Thread.sleep(3000); 
        System.out.println("hello, I'm runnable A"); 

       } catch (Exception ex) { 
        throw new RuntimeException("RunnableA", ex); 
       } 
      } 
     }; 
     runB = new Runnable() { 
      @Override 
      public void run() { 
       System.out.println("hello, I'm runnable B"); 
      } 
     }; 

     pool.submit(runA); 
     pool.submit(runB); 
     pool.shutdown(); 
    } 
} 

ПРИМЕЧАНИЯ: реализация RejectedExecutionHandler до вы! Я просто предлагаю спать как механизм блокировки, но если вы можете сделать логику более сложной, попросите threadpool, у нее есть свободные потоки или нет. Если нет, то спать; если да, то снова отправьте задание ...

+0

Да, я думаю, что это сработает, используя сон немного грязно, но это просто, и я предпочитаю, чтобы сложная дополнительная логика.Дело в том, что мне удастся получить это изменение, если я смогу исправить случай редкого края, который мы обнаруживаем с помощью текущего подхода (планируем все сразу + отменить все оставшиеся задачи после таймаута) с более сложной задачей. Спасибо за ваше решение, чтобы понять, как он соответствует моему коду. – Dici

+1

Я столкнулся с одной проблемой с этим подходом: «Runnable», переданный как параметр, не совсем то же самое, что и тот, который был отправлен. Вместо этого он завернут в «FutureTask», который имеет некоторую логику обработки исключений, которая мешает моей работе работать. Вы не могли знать этого в контексте, заданном в моем вопросе, но я добавляю это как предупреждение для будущих читателей. Я могу решить эту проблему, сильно полагаясь на поведение «SynchronousQueue». К сожалению, я не могу передать весь код. Я скажу, дойду ли я до рабочего решения. – Dici

+1

Мне удалось заставить его работать. Это немного грязно, но я рад, что смог удалить 25% существующего кода, улучшив поведение (хотя я потерял одну небольшую функцию об обработке ошибок). Мне кажется, мне все это нравится: p. Я приму этот ответ. – Dici