0

Мне нужно распараллелить существующую фоновое задание, чтобы вместо последовательного использования ресурсов «x» он параллельно заканчивал работу, используя только «y» потоков (y < < x). Эта задача постоянно работает в фоновом режиме и продолжает обрабатывать некоторые ресурсы.Ждите потоков демонов для завершения итерации с использованием службы-исполнителя

код структурирована следующим образом:

class BaseBackground implements Runnable { 
    @Override 
    public void run() { 
     int[] resources = findResources(...); 

     for (int resource : resources) { 
      processResource(resource); 
     } 

     stopProcessing(); 
    } 

    public abstract void processResource(final int resource); 
    public void void stopProcessing() { 
     // Override by subclass as needed 
    } 
} 

class ChildBackground extends BaseBackground { 

    @Override 
    public abstract void processResource(final int resource) { 
     // does some work here 
    } 

    public void void stopProcessing() { 
     // reset some counts and emit metrics 
    } 
} 

Я изменил ChildBackground следующим образом:

class ChildBackground extends BaseBackground { 

    private final BlockingQueue<Integer> resourcesToBeProcessed; 

    public ChildBackground() { 
     ExecutorService executorService = Executors.newFixedThreadPool(2); 
     for (int i = 0; i < 2; ++i) { 
      executorService.submit(new ResourceProcessor()); 
     } 
    } 

    @Override 
    public abstract void processResource(final int resource) { 
     resourcesToBeProcessed.add(resource); 
    } 

    public void void stopProcessing() { 
     // reset some counts and emit metrics 
    } 

    public class ResourceProcessor implements Runnable { 
     @Override 
     public void run() { 
      while (true) { 
       int nextResource = resourcesToBeProcessed.take(); 
       // does some work 
      } 
     } 
    } 
} 

Я не создающего и срывая ExecutorService каждый раз, потому что сбор мусора немного проблемы в моей службе. Хотя, я не понимаю, насколько это плохо, так как я не буду порождать более 10 потоков на каждой итерации.

Я не могу понять, как подождать, пока все ResourceProcessor s закончит обработку ресурсов для одной итерации, чтобы я мог сбросить некоторые значения и исправить показатели в stopProcessing. Я рассмотрел следующие варианты:

1) executorService.awaitTermination (тайм-аут). Это не будет работать, поскольку он всегда будет блокироваться до таймаута, потому что потоки ResourceProcessor никогда не закончат работу

2) Я могу узнать количество ресурсов после findResources и сделать его доступным для дочернего класса и каждый ResourceProcessor увеличивает количество обработанных ресурсов. Мне нужно будет подождать, пока все ресурсы будут обработаны в stopProcessing до сброса счетчиков. Мне нужно что-то вроде CountDownLatch, но вместо этого оно должно считаться UP. В этом варианте будет много государственного управления, которого я не особо люблю.

3) Я мог бы обновить public abstract void processResource(final int resource), чтобы включить подсчет общих ресурсов и выполнить дочерний процесс, пока все потоки не обработают общие ресурсы. В этом случае также будет некоторое управление состоянием, но оно будет ограничено дочерним классом.

В любом из двух случаев, я должен будет добавить wait() & notify() логику, но я не уверен в моем подходе. Это то, что я имею:

class ChildBackground extends BaseBackground { 

    private static final int UNSET_TOTAL_RESOURCES = -1; 

    private final BlockingQueue<Integer> resourcesToBeProcessed; 

    private int totalResources = UNSET_TOTAL_RESOURCES; 
    private final AtomicInteger resourcesProcessed = new AtomicInteger(0); 

    public ChildBackground() { 
     ExecutorService executorService = Executors.newFixedThreadPool(2); 
     for (int i = 0; i < 2; ++i) { 
      executorService.submit(new ResourceProcessor()); 
     } 
    } 

    @Override 
    public abstract void processResource(final int resource, final int totalResources) { 
     if (this.totalResources == UNSET_TOTAL_RESOURCES) { 
      this.totalResources = totalResources; 
     } else { 
      Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources); 
     } 
     resourcesToBeProcessed.add(resource); 
    } 

    public void void stopProcessing() { 
     try { 
      waitForAllResourcesToBeProcessed(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
     resourcesProcessed.set(0); 
     totalResources = UNSET_TOTAL_RESOURCES; 
     // reset some counts and emit metrics 
    } 

    private void incrementProcessedResources() { 
     synchronized (resourcesProcessed) { 
      resourcesProcessed.getAndIncrement(); 
      resourcesProcessed.notify(); 
     } 
    } 

    private void waitForAllResourcesToBeProcessed() throws InterruptedException { 
     synchronized (resourcesProcessed) { 
      while (resourcesProcessed.get() != totalResources) { 
       resourcesProcessed.wait(); 
      } 
     } 
    } 

    public class ResourceProcessor implements Runnable { 
     @Override 
     public void run() { 
      while (true) { 
       int nextResource = resourcesToBeProcessed.take(); 
       try { 
        // does some work 
       } finally { 
        incrementProcessedResources(); 
       } 
      } 
     } 
    } 
} 

Я не уверен, что при использовании AtomicInteger правильный способ сделать это, и если да, то мне нужно вызвать ожидание() и уведомить(). Если я не использую wait() и notify(), мне даже не нужно выполнять все в синхронизированном блоке.

Пожалуйста, дайте мне знать ваши мысли об этом подходе, если я просто должен создать и выключить ExecutorService для каждой итерации или есть четвертый подход, который я должен преследовать.

+1

Посмотрите на «Callables and Futures» на этой странице http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/ –

+0

Фьючерсы не являются опцией потому что потоки исполнителей выполняются в замкнутом цикле, который останавливается только тогда, когда служба деактивирована. – user1071840

+1

Будучи уведомленным о том, что задание, выполняемое исполнителем, является целью мысли фьючерсов. Конечно, вы не можете использовать, например, https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#allOf-java.util.concurrent.CompletableFuture...- ? Вы отправляете одну партию ресурса исполнителям (накапливая 1 будущее по ресурсу), создаете будущее, которое ждет их объединения, и сделайте последнее вычисление ваших показателей. После представления этого последнего будущего вы могли бы забыть обо всех них. В реактивном стиле. – GPI

ответ

1

Ваш код кажется излишне сложным. Почему у вас есть собственная очередь, когда есть очередь уже внутри ExecutorService? Вам нужно сделать целую кучу администрирования, когда я думаю, что вы можете позволить запасу ExecutorService обрабатывать его для вас.

Я бы определить свои рабочие места, как:

public static class ResourceProcessor implements Runnable { 
    private final int resource; 
    public ResourceProcessor(int resource) { 
     this.resource = resource; 
    } 
    public void run() { 
     try { 
     // does some work 
     } finally { 
     // if this is still necessary then you should use a `Future` instead 
     incrementProcessedResources(); 
     } 
    } 
} 

Тогда вы могли бы представить им нравится:

ExecutorService executorService = Executors.newFixedThreadPool(2); 
for (int i = 0; i < totalResources; ++i) { 
    executorService.submit(new ResourceProcessor(i)); 
} 
// shutdown the thread pool after the last submit 
executorService.shutdown(); 

executorService.awaitTermination(timeout). Это не будет работать, так как он всегда будет блокироваться до таймаута, потому что потоки ResourceProcessor никогда не закончат работу.

Это будет работать.

2) Я могу узнать количество ресурсов [закончено].

Вам все еще нужно это, если вы можете позвонить awaitTermination(...)?

3) Я мог бы обновить общественный абстрактный недействительный processResource (конечный ИНТ ресурс), чтобы включить подсчет общих ресурсов и иметь дочерний процесс ждать, пока все потоки не обработают все ресурсы ...

Same вопрос. Это необходимо?

Если вам действительно нужно знать список обработанных запросов, то вы можете, например, упомянуть @ScaryWombat, использовать Future<Integer> и Callable<Integer> или использовать ExecutorCompletionService.

Фьючерсы не являются опцией, поскольку потоки исполнителей работают в замкнутом цикле, который останавливается только при отключении услуги.

Не могли бы вы объяснить это больше?

Надеюсь, это поможет.

+0

О боже, я просто был глуп. Я полностью забыл, что могу предоставить больше задач (потоков) службе-исполнителям, чем размер пула потоков, спасибо, что напомнил мне об этом. – user1071840

+1

Чтобы быть точным @ user1071840, задания - это просто Runnables, а не потоки, так что вы можете добавить 100k заданий в ваш пул потоков, который обрабатывает их только с помощью двух потоков. – Gray