2014-12-19 5 views
2

Я использую Reactor 2.0.0.M1, и я пытаюсь отфильтровать Stream. В зависимости от результатов моей логической операции я хочу либо продолжить с тем или иным потоком. Это представляется возможным с помощью функции otherwise(), но не совсем понятно, как ее использовать.Каков правильный способ обработки фильтров Stream в Reactor 2.0?

Мой поток выглядит следующим образом:

stream.filter(o -> o.isValid()); 

Чтобы обработать случай, когда o.isValid() верно, мое понимание, я могу просто позвонить .map() продолжить вниз по течению.

Чтобы обработать случай, когда o.isValid() является ложным, я могу получить доступ к альтернативному потоку .otherwise().

Но, похоже, не существует or() или аналогичного метода, поэтому невозможно настроить оба потока в полностью текущем режиме.

Лучшее, что я могу придумать что-то вроде этого:

FilterAction<Object> filterAction = stream.filter(o -> o.isValid()); 

// Returns a 'true' Stream, which might additional operations 
filterAction 
    .map(o -> trueOperation1(o)) 
    .map(o -> trueOperation2(o)); 

// Returns a 'false' Stream, which might different additional operations 
filterAction.otherwise() 
    .map(o -> falseOperation1(o)) 
    .map(o -> falseOperation2(o)); 

Это действительно лучший способ, чтобы продолжить?

ответ

3

Я решил эту проблему с помощью группеПо() и flatMap().

Вот пример:

// your initial stream 
Broadcaster<Object> stream = Streams.<Object>broadcast(environment); 

stream 
    .groupBy(o -> o.isValid()) 
    .flatMap(groupedStream -> { 
     if (groupedStream.key()) { 
      return groupedStream.map(o -> trueOperation(o)); 
     } else { 
      return groupedStream.map(o -> falseOperation(o)); 
    } 
    .map(o -> additionalOperations(); 

Что здесь происходит, что группеПо() преобразует поток в Stream<GroupedStream<O>>. Другими словами, поток потоков объектов. Каждый внутренний поток содержит группу объектов, которые были развернуты операцией в вызове groupBy(). В моем случае я отфильтровал объекты в true и false ведрах.

Далее, flatMap() принимает несколько потоков, обрабатывает их и затем выравнивает выходные данные в один Stream<Object>. В пределах flatMap() вы можете проверить ключ Stream() и выполнить дополнительные операции над потоком на основе ключа().

После завершения работы flatMap() у вас снова есть Stream и вы можете выполнять любую последующую обработку.

0

Похоже, что вы хотите

stream.filter(o -> { 
    if (o.isValid()) { 
    return trueOperation(o); 
    } else { 
    return falseOperation(o); 
    } 
}); 
+0

Это имеет смысл только в том случае, если остальная часть потока остается прежней. Что вы будете делать, когда поведение потока зависит от результата фильтра? – JBCP