2016-05-25 3 views
4

Не так давно я ответил на этот вопрос: Executing Dependent tasks in parallel in Java Но использование future.get() блокирует текущий поток, и есть вероятность, что пул потоков заканчивается из потоков, если слишком много gets() вызывается в одно время. Как составить фьючерсы на фьючерсы на Java?Как выполнять зависимые задачи в Java 8 без блокировки

+0

Обычно «.get()» является барьером. Вы не можете рассчитывать, если у вас нет результата. –

+0

Да, но вы можете создавать фьючерсы с фьючерсов, см. Ответ. – Snickers3192

+0

Посмотрите на https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html. Он может выполнять последовательное выполнение. –

ответ

3

Я думал, что сам отвечу на этот вопрос, можно использовать CompletableFutures в java вместо Futures. CompletedFutures позволяют создавать композиции с помощью метода thenCombine, который аналогичен scalas flatMap. Теперь нет блокировки, и для достижения наивысшего времени требуется только 3 потока.

import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.function.BiFunction; 
import java.util.function.Supplier; 

public class Barrista 
{ 

    // number of threads used in executor 
    static final int NOTHREADS = 3; 

    // time of each task 
    static final int HEATWATER = 1000; 
    static final int GRINDBEANS = 1000; 
    static final int FROTHMILK = 1000; 
    static final int BREWING = 1000; 
    static final int COMBINE = 1000; 

    // method to simulate work (pause current thread without throwing checked exception) 
    public static void pause(long t) 
    { 
     try 
     { 
      Thread.sleep(t); 
     } 
     catch(Exception e) 
     { 
      throw new Error(e.toString()); 
     } 
    } 

    // task to heat some water 
    static class HeatWater implements Supplier<String> 
    { 
     @Override 
     public String get() 
     { 
      System.out.println("Heating Water"); 
      pause(HEATWATER); 
      return "hot water"; 
     } 
    } 

    // task to grind some beans 
    static class GrindBeans implements Supplier<String> 
    { 
     @Override 
     public String get() 
     { 
      System.out.println("Grinding Beans"); 
      pause(GRINDBEANS); 
      return "grinded beans"; 
     } 
    } 

    // task to froth some milk 
    static class FrothMilk implements Supplier<String> 
    { 
     @Override 
     public String get() 
     { 
      System.out.println("Frothing some milk"); 
      pause(FROTHMILK); 
      return "some milk"; 
     } 
    } 

    // task to brew some coffee 
    static class Brew implements BiFunction<String,String, String> 
    { 
     @Override 
     public String apply(String groundBeans, String heatedWater) 
     { 
      System.out.println("Brewing coffee with " + groundBeans + " and " + heatedWater); 
      pause(BREWING); 
      return "brewed coffee"; 
     } 
    } 

    // task to combine brewed coffee and milk 
    static class Combine implements BiFunction<String,String, String> 
    { 
     @Override 
     public String apply(String frothedMilk, String brewedCoffee) 
     { 
      System.out.println("Combining " + frothedMilk + " "+ brewedCoffee); 
      pause(COMBINE); 
      return "Final Coffee"; 
     } 
    } 

    public static void main(String[] args) 
    { 
     ExecutorService executor = Executors.newFixedThreadPool(NOTHREADS); 

     long startTime = System.currentTimeMillis(); 

     try 
     { 
      // create all the tasks and let the executor handle the execution order 
      CompletableFuture<String> frothMilk =  CompletableFuture.supplyAsync(new FrothMilk(), executor); 
      CompletableFuture<String> heatWaterFuture = CompletableFuture.supplyAsync(new HeatWater(), executor); 
      CompletableFuture<String> grindBeans =  CompletableFuture.supplyAsync(new GrindBeans(), executor); 

      CompletableFuture<String> brew = heatWaterFuture.thenCombine(grindBeans, new Brew()); 
      CompletableFuture<String> coffee =   brew.thenCombine(frothMilk, new Combine()); 

      // final coffee 
      System.out.println("Here is the coffee:" + coffee.get()); 

      // analyzing times: 
      System.out.println("\n\n"); 
      System.out.println("Actual time: \t\t\t\t" + (System.currentTimeMillis() - startTime)/1000.0); 

      // compute the quickest possible time: 
      long path1 = Math.max(GRINDBEANS, HEATWATER)+ BREWING + COMBINE; 
      long path2 = FROTHMILK + COMBINE; 
      System.out.println("Quickest time multi-threaded:\t\t" + Math.max(path1, path2)/1000.0); 

      // compute the longest possible time: 
      long longestTime = HEATWATER + GRINDBEANS + FROTHMILK + BREWING + COMBINE; 
      System.out.println("Quickest time single-threaded thread:\t" + longestTime/1000.0); 
     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
     } 
     finally 
     { 
      executor.shutdown(); 
     } 

    } 
} 
2

Java 8 вводит CompletableFuture, где вам не требуется особо блокировать get вызова, за исключением того, вы вызвать функцию обратного вызова зависит от стадии завершения.

Будущее, которое может быть явно завершено (установив его значение и статуса), и может быть использован в качестве CompletionStage, поддерживая зависимые функций и действия, которые вызывают по его завершению.

Подробнее о documentation

Перед Java 8, эта концепция доступна с Google заводной библиотека, читать больше на documentation и яровой библиотеки too.

+0

Спасибо, что я использовал их в своем ответе. У меня как-то был ответ на этот вопрос, когда я задавал вопрос, но я думал, что это будет полезно людям. Мне очень трудно найти, как это сделать. – Snickers3192