2013-07-31 5 views
21

Я играл с CompletionStage/CompletableFuture в Java 8, чтобы выполнить асинхронную обработку, которая работает достаточно хорошо. Однако иногда мне нужен этап для выполнения асинхронной обработки итератора/потока элементов, и, похоже, не существует способа сделать это.Можно ли использовать Java 8 Streams API для асинхронной обработки?

В частности, Stream.forEach() имеет семантику, которая после вызова всех элементов была обработана. Я хотел бы то же самое, но с CompletionStage вместо, например .:

CompletionStage<Void> done = stream.forEach(...); 
done.thenRun(...); 

Если поток опирается на асинхронный результат потоковой это будет намного лучше, чем ждать, чтобы быть полным в самом коде выше.

Возможно ли построить это с помощью Java API Java как-то? Обходные?

+0

Привет! Я обновил свой ответ. Если это вас удовлетворит, проголосуйте, я пытаюсь заработать больше репутации;) –

+2

@ Rickard Может быть, слишком поздно :), но я нашел что-то, что вы хотите, если я правильно вас пойму. http://www.reactive-streams.org/. https://github.com/reactor/reactor –

ответ

20

Насколько я знаю, API потоков не поддерживает обработку асинхронных событий. Похоже, вы хотите что-то вроде Reactive Extensions для .NET, и есть порт Java, который называется RxJava, созданный Netflix.

RxJava поддерживает многие из тех же операций высокого уровня, что и потоки Java 8 (такие как карта и фильтр), и является асинхронным.

Update: Существует в настоящее время reactive streams инициативы в работе, и это выглядит как JDK 9 будет включать в себя поддержку, по крайней мере, часть его, хотя Flow класса.

+19

Действительно, RxJava - это то, что вы хотите. Центр проектирования для Streams в основном касается данных, к которым можно получить доступ без латентности (либо от структур данных, либо от генерирующих функций); дизайн-центр Rx - это бесконечные потоки событий, которые могут поступать асинхронно. –

7

Поскольку @KarolKrol ссылается на вас, вы можете сделать это с потоком CompletableFuture.

Существует библиотека, которая построена поверх потоков JDK8, чтобы облегчить работу с потоками CompletableFuture под названием cyclops-react.

Чтобы создать свои потоки, вы можете использовать свободный API обещаний cyclops-реагировать или вы можете использовать Stages simple-react.

+0

Я забыл проголосовать в эти дни :) –

+0

@ KarolKról вместо лоббирования upvotes, просто напишите наилучшие ответы на большее количество вопросов. –

2

cyclops-react (Я являюсь автором этой библиотеки) предоставляет класс StreamUtils для обработки потоков. Одной из функций, которые он предоставляет, является futureOperations, которая обеспечивает доступ к стандартным операциям терминала Stream (а затем к некоторым) с помощью твиста - Stream выполняется асинхронно, и результат возвращается внутри CompletableFuture. .eg

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) 
             .map(i->i+2); 
CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, 
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Существует также класс ReactiveSeq convience, который оборачивает поток и обеспечивает ту же функциональность, с хорошим беглым API

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) 
             .map(i->i+2) 
             .futureOperations(
              Executors.newFixedThreadPool(1)) 
             .collect(Collectors.toList()); 

Как Адам отметил cyclops-react FutureStreams предназначен для обработки данных асинхронно (путем комбинирования фьючерсов и потоков вместе) - он особенно подходит для многопоточных операций, которые включают блокировку ввода-вывода (например, чтение файлов, создание вызовов в режиме «db», вызов отдыха и т. д.).

1

Можно создать поток, нанести на карту каждый элемент до CompletionStage и собрать результаты с помощью CompletionStage.thenCombine(), но полученный код не будет более читаемым, а затем простым способом для этого.

CompletionStage<Collection<Result>> collectionStage = 
     CompletableFuture.completedFuture(
      new LinkedList<>() 
     ); 

    for (Request request : requests) { 
     CompletionStage<Result> resultStage = performRequest(request); 
     collectionStage = collectionStage.thenCombine(
      resultStage, 
      (collection, result) -> { 
       collection.add(result); 
       return collection; 
      } 
     ); 
    } 

    return collectionStage; 

Этот пример может быть легко преобразован в функциональный для каждого, не теряя удобочитаемости. Но с использованием потока reduce или collect требуется дополнительный не очень тонкий код.

Обновление: CompletableFuture.allOf и CompletableFuture.join предоставляют другой, более читаемый способ преобразования коллекции будущих результатов в будущую коллекцию результатов.