1

Можно ли каким-либо образом использовать ExecutorService в Java для планирования рекурсивных задач?Как планировать задачи рекурсивно с помощью ExecutorService

Пример кода: (Примечание: попытаться/поймать около Thread.sleep опущено для повышения читаемости)

final ExecutorService executor = Executors.newFixedThreadPool(4); 
for (int i = 0; i < 8; i++) { 
    executor.execute(() -> { // TASK 1 
     Thread.sleep(100); // simulate some work 

     executor.execute(() -> { // TASK2 
      Thread.sleep(500); // simulate some longer work 
     }); 

    }); 
} 

# terminate when all tasks proceeded 
executor.shutdown(); 
executor.awaitTermination(9999999, TimeUnit.DAYS); 

Проблема, как представляется, порядок исполнения:

  1. создать исполнителя
  2. выполнение расписания ЗАДАЧА 1 (8x)
  3. shutdown
  4. awaitTermination
  5. конца первого сна, а затем запланировать выполнение задачи 2
  6. FAIL, потому что после завершения работы никакой задачи не может быть выполнена.

Это, я считаю, общая проблема. Однако я не могу найти подходящего решения. Список того, что я ищу:

Требование:

  • способность exectute задач из уже запланированных задач
  • блока как-то в конце, пока все задачи не будут выполнены
  • нет активного ожидания для выполнения задач
  • фиксированный макс. количество потоков используется
  • возможно безблокировочного и поточно-

Не могли бы вы мне помочь? Я не хочу реализовывать свой собственный пул потоков, поскольку это должно быть уже реализовано многократно (желательно в стандартной библиотеке).

Спасибо

ответ

2

Ваш основной проблемой является то, что некоторые задачи могут или не могут порождать больше задач, то во время их во время выполнения. Поэтому вы должны дождаться выполнения этой задачи до ее завершения. Вы не можете использовать executor.shutdown(), иначе вы наверняка закроете бассейн преждевременно.

Необходимо выполнить какой-либо механизм хореографа, задачи которого должны ждать завершения другого, и вы должны сохранить список всех задач, которые должны быть выполнены до завершения пула.

Вот рудиментарная демонстрация того, что вам нужно будет сделать. Вероятно, вам понадобится нечто более сложное, в зависимости от требований к взаимоотношениям ваших задач.

По существу, используйте Callable s и Future<Void>.get(), чтобы дождаться выполнения любой задачи, которая должна быть завершена.

final ExecutorService executor = Executors.newFixedThreadPool(4); 

class Task1 implements Callable<Void> { 

    @Override 
    public Void call() throws Exception { 
     Thread.sleep(100); // simulate some work 
     return null; 
    } 

} 

class Task2 implements Callable<Void> { 

    final Future<Void> waitFor; 

    Task2(Future<Void> waitFor) { 
     // This task must wait for a previous task to complete before commencement. 
     this.waitFor = waitFor; 
    } 

    @Override 
    public Void call() throws Exception { 
     // Wait for the first task to complete. 
     waitFor.get(); 
     Thread.sleep(100); // simulate some work 
     return null; 
    } 

} 

public void test() throws InterruptedException { 
    // All of these tasks must complete before we close down the pool. 
    List<Future<Void>> waitFor = new ArrayList<>(); 
    for (int i = 0; i < 8; i++) { 
     Future<Void> f1 = executor.submit(new Task1()); 
     // We must wait for f1 to complete. 
     waitFor.add(f1); 
     // No need to wait for f2. 
     executor.submit(new Task2(f1)); 
    } 
    // Wait for all of the primary tasks to complete. 
    for (Future<Void> wait : waitFor) { 
     try { 
      wait.get(); 
     } catch (InterruptedException | ExecutionException ex) { 
      Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
    // Can now shut down - will wait for all sub-tasks to complete because they are all in the queue now. 
    executor.shutdown(); 
    executor.awaitTermination(9999999, TimeUnit.DAYS); 
} 
+0

Спасибо, что это хорошее решение, однако, я предпочел бы избежать заставляя пользователя осуществлять свои собственные управления зависимостями. Кроме того, количество заданий и т. Д. Неизвестно (так как они могут передаваться по сети). Я скорее ищу простое решение для пользователя, то есть используя только lambdas (а не некоторые сложные классы с зависимостями). Может быть, ваше решение может быть преобразовано таким образом? – petrbel

1

Вы можете попробовать создать ThreadPoolExecutor на основе PriorityBlockingQueue. Затем вы помещаете shutDownTask в конец Queue, который будет ждать завершения других, а затем shutDown pool.Вот пример:

import java.util.concurrent.PriorityBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestExec { 

    public static void main(String... s){ 

     PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(10, 
       (o1,o2)-> {return o1 instanceof MyRunnable ? 1 : (o2 instanceof MyRunnable ? -1 : 0);} // shutDownTask at bottom of queue 
       ); 
     ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 
             0L, TimeUnit.MILLISECONDS, 
             queue); 

     //recursive tasks 
     Runnable task3 =() -> { 
      try { 
       Thread.sleep(300); 
      } catch (Exception e) { e.printStackTrace(); } 
      System.out.println("done task 3"); 
     }; 
     Runnable task2 =() -> { 
      try { 
       Thread.sleep(200); 
      } catch (Exception e) { e.printStackTrace(); } 
      executor.execute(task3); 
      System.out.println("done task 2"); 
     }; 
     Runnable task1 =() -> { 
      try { 
       Thread.sleep(100); 
      } catch (Exception e) { e.printStackTrace(); } 
      executor.execute(task2); 
      System.out.println("done task 1"); 
     }; 

     for (int i = 0; i < 8; i++) { 
      executor.execute(task1); 
     } 

     //shutDownTask 
     MyRunnable r =()->{ 
      while(executor.getActiveCount() > 1){ //wait until other tasks to be done 
       try { 
        Thread.sleep(100); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 
      executor.shutdown(); 
      try { 
       executor.awaitTermination(1, TimeUnit.SECONDS); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      }}; 
     executor.execute(r); 
    } 

    private static interface MyRunnable extends Runnable{ 
     // dummy Runnable 
    } 
} 
+0

Ваше решение содержит активное ожидание (определение shutDownTask), которое неприемлемо. Однако идея дополнительной закрывающей задачи кажется законной. К сожалению, я не могу развить эту идею. Неужели мы уверены, что он будет выполнен как последний? – petrbel

+0

Здесь 'while (executor.getActiveCount()> 1)' является условием активного ожидания, если в некотором случае пул будет иметь только shutDownTask в очереди и одну выполняющую задачу, которая может запускать новую одну задачу. Перед этим условием 'executer' получает реальные задачи из пула и выполняет их. – alex2410

+0

Другими словами, ожидание начинается, когда пул имеет задачи в очереди меньше количества потоков пулов. – alex2410

0

Одним из вариантов может быть использование метода afterExecute и изменить поведение метода shutdown. Вот небольшая работоспособной демонстрационная программа:

import java.util.concurrent.*; 

public class TpShutdownAfterTasksDone extends ThreadPoolExecutor { 

    private volatile boolean shutdown; 
    private final CountDownLatch preShutdown = new CountDownLatch(1); 

    public TpShutdownAfterTasksDone() { 
     this(0, Integer.MAX_VALUE, 
       60L, TimeUnit.SECONDS, 
       new SynchronousQueue<Runnable>()); 
     // Copied from 
     // Executors.newCachedThreadPool(); 
    } 

    public TpShutdownAfterTasksDone(int corePoolSize, int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    @Override 
    public void shutdown() { 

     shutdown = true; 
     if (getActiveCount() < 1 && getQueue().size() < 1) { 
      println("Immediate shutdown"); 
      realShutdown(); 
     } 
    } 

    protected void realShutdown() { 

     println("Real shutdown"); 
     super.shutdown(); 
     preShutdown.countDown(); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     if (shutdown && getActiveCount() <= 1 && getQueue().size() < 1) { 
      // see http://stackoverflow.com/q/21277746/3080094 
      // active count is still 1 when this method is executed after the last task. 
      println("AfterExecute shutdown"); 
      realShutdown(); 
     } else { 
      println("Tasks: " + getActiveCount()); 
     } 
    } 

    @Override 
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 

     if (!preShutdown.await(timeout, unit)) { 
      return false; 
     } 
     // TODO: calculate remaining timeout period 
     return super.awaitTermination(timeout, unit); 
    } 

    public static void main(String[] args) { 

     ThreadPoolExecutor tp = new TpShutdownAfterTasksDone(); 
     try { 
      println("Executing test runnable."); 
      tp.execute(new TestRunRunnable(tp)); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      println("Shutting tp down."); 
      tp.shutdown(); 
      try { 
       println("Awaiting tp termination."); 
       if (tp.awaitTermination(2L, TimeUnit.SECONDS)) { 
        println("Tp terminated."); 
       } else { 
        throw new RuntimeException("Tp did not terminate."); 
       } 
      } catch (Exception e) { 
       e.printStackTrace(); 
       tp.shutdownNow(); 
      } 
     } 

    } 

    static class TestRunRunnable implements Runnable { 

     ThreadPoolExecutor tp; 
     public TestRunRunnable(ThreadPoolExecutor tp) { 
      this.tp = tp; 
     } 
     @Override 
     public void run() { 

      sleep(100); 
      tp.execute(new Runnable() { 
       @Override 
       public void run() { 
        sleep(500); 
       } 
      }); 
     } 
    } 

    private static void sleep(int time) { 

     try { 
      println("Sleeping " + time); 
      Thread.sleep(time); 
      println("Slept " + time); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    private static final long START_TIME = System.currentTimeMillis(); 

    private static void println(String msg) { 
     System.out.println((System.currentTimeMillis() - START_TIME) + "\t " + msg); 
    } 

} 

Выход что-то вроде:

 
2 Executing test runnable. 
3 Shutting tp down. 
3 Sleeping 100 
3 Awaiting tp termination. 
103 Slept 100 
103 Tasks: 2 
104 Sleeping 500 
604 Slept 500 
604 AfterExecute shutdown 
604 Real shutdown 
604 Tp terminated.