2010-01-29 5 views
19

У меня есть код, который отправляет запрос другому потоку, который может или не может отправить этот запрос еще одному потоку. Это дает возвращаемый тип Future<Future<T>>. Есть ли какой-то отвратительный способ немедленно превратить это в Future<T>, который ждет завершения всей будущей цепочки?Есть ли простой способ превратить будущее <Future<T>> в будущее <T>?

Я уже пользуюсь библиотекой Guava для обработки других материалов для совместной работы и замены Google Collections и ее работы, но я не могу найти что-то для этого случая.

+4

Было бы полезно, если бы вы могли добавьте немного больше контекста. Очевидным ответом является вызов get(), но это, вероятно, не то, что вы хотите. –

+0

Готово. Извините, не было ясно. – Nik

+0

Кажется, что вам нужен Monad – user

ответ

5

Guava 13.0 добавляет Futures.dereference для этого. Для этого требуется ListenableFuture<ListenableFuture>, а не просто Future<Future>. (Для работы на равнине Future потребуется вызов makeListenable, для каждого из которых требуется выделенный поток для срока действия задачи (как это стало яснее по новому имени метода, JdkFutureAdapters.listenInPoolThread).

0

Вы могли бы создать класс, как:

public class UnwrapFuture<T> implements Future<T> { 
    Future<Future<T>> wrappedFuture; 

    public UnwrapFuture(Future<Future<T>> wrappedFuture) { 
     this.wrappedFuture = wrappedFuture; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     try { 
      return wrappedFuture.get().cancel(mayInterruptIfRunning); 
     } catch (InterruptedException e) { 
      //todo: do something 
     } catch (ExecutionException e) { 
      //todo: do something 
     } 
    } 
    ... 
} 

Вы будете иметь дело с исключениями, которые получают() может поднять, но и другие методы не могут.

+1

Thats в значительной степени то, чего я пытался избежать. Кроме того, что метод отмены, который у вас есть, заставит отменить ожидание, пока не будет выполнено первое будущее в цепочке. Это определенно не то, что я ищу. – Nik

+2

«превратите это в будущее , которое ждет завершения всей будущей цепочки?» ... Я не думаю, что вы можете отменить второе будущее, пока не овладеете им. Но вы не можете получить его, пока первое будущее не вернет его. – Dave

+0

Хороший улов. В то время как второе будущее создается первым, я уверен, что вы можете попасть в состояние, в котором вы отменили первое будущее, но оно все равно делает второй, и вы не можете его отменить. Бьюсь об заклад, вы можете исправить это с помощью «Futures.makeListenable» в первом будущем и добавить слушателя, который немедленно отменяет прикованное будущее при возврате. Тогда проблема будет протестирована для этого случая. – Nik

0

Это был мой первый удар, но я уверен, что в этом много ошибок. Я был бы более чем счастлив просто заменить его чем-то вроде Futures.compress(f).

public class CompressedFuture<T> implements Future<T> { 
    private final Future<Future<T>> delegate; 

    public CompressedFuture(Future<Future<T>> delegate) { 
     this.delegate = delegate; 
    } 

    @Override 
    public boolean cancel(boolean mayInterruptIfRunning) { 
     if (delegate.isDone()) { 
      return delegate.cancel(mayInterruptIfRunning); 
     } 
     try { 
      return delegate.get().cancel(mayInterruptIfRunning); 
     } catch (InterruptedException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } catch (ExecutionException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } 
    } 

    @Override 
    public T get() throws InterruptedException, ExecutionException { 
     return delegate.get().get(); 
    } 

    @Override 
    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     long endTime = System.currentTimeMillis() + unit.toMillis(timeout); 
     Future<T> next = delegate.get(timeout, unit); 
     return next.get(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 
    } 

    @Override 
    public boolean isCancelled() { 
     if (!delegate.isDone()) { 
      return delegate.isCancelled(); 
     } 
     try { 
      return delegate.get().isCancelled(); 
     } catch (InterruptedException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } catch (ExecutionException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } 
    } 

    @Override 
    public boolean isDone() { 
     if (!delegate.isDone()) { 
      return false; 
     } 
     try { 
      return delegate.get().isDone(); 
     } catch (InterruptedException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } catch (ExecutionException e) { 
      throw new RuntimeException("Error fetching a finished future", e); 
     } 
    } 
} 
1

Я думаю, что это лучшее, что можно сделать для реализации контракта Future. Я придерживался того, чтобы быть как можно более нескромным, чтобы быть уверенным, что он соответствует контракту. Не особенно реализация get с таймаутом.

import java.util.concurrent.*; 

public class Futures { 
    public <T> Future<T> flatten(Future<Future<T>> future) { 
    return new FlattenedFuture<T>(future); 
    } 

    private static class FlattenedFuture<T> implements Future<T> { 
    private final Future<Future<T>> future; 

    public FlattenedFuture(Future<Future<T>> future) { 
     this.future = future; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     if (!future.isDone()) { 
     return future.cancel(mayInterruptIfRunning); 
     } else { 
     while (true) { 
      try { 
      return future.get().cancel(mayInterruptIfRunning); 
      } catch (CancellationException ce) { 
      return true; 
      } catch (ExecutionException ee) { 
      return false; 
      } catch (InterruptedException ie) { 
      // pass 
      } 
     } 
     } 
    } 

    public T get() throws InterruptedException, 
          CancellationException, 
          ExecutionException 
    { 
     return future.get().get(); 
    } 

    public T get(long timeout, TimeUnit unit) throws InterruptedException, 
                CancellationException, 
                ExecutionException, 
                TimeoutException 
    { 
     if (future.isDone()) { 
     return future.get().get(timeout, unit); 
     } else { 
     return future.get(timeout, unit).get(0, TimeUnit.SECONDS); 
     } 
    } 

    public boolean isCancelled() { 
     while (true) { 
     try { 
      return future.isCancelled() || future.get().isCancelled(); 
     } catch (CancellationException ce) { 
      return true; 
     } catch (ExecutionException ee) { 
      return false; 
     } catch (InterruptedException ie) { 
      // pass 
     } 
     } 
    } 

    public boolean isDone() { 
     return future.isDone() && innerIsDone(); 
    } 

    private boolean innerIsDone() { 
     while (true) { 
     try { 
      return future.get().isDone(); 
     } catch (CancellationException ce) { 
      return true; 
     } catch (ExecutionException ee) { 
      return true; 
     } catch (InterruptedException ie) { 
      // pass 
     } 
     } 
    } 
    } 
} 
7

Другая возможная реализация, которая использует библиотеки guava и намного проще.

import java.util.concurrent.*; 
import com.google.common.util.concurrent.*; 
import com.google.common.base.*; 

public class FFutures { 
    public <T> Future<T> flatten(Future<Future<T>> future) { 
    return Futures.chain(Futures.makeListenable(future), new Function<Future<T>, ListenableFuture<T>>() { 
     public ListenableFuture<T> apply(Future<T> f) { 
     return Futures.makeListenable(f); 
     } 
    }); 
    } 
} 
+0

Похоже, он сделает это и позвольте мне делегировать всю будущую передачу гуаве. – Nik