2016-11-23 15 views
0

Мой API составляет около 100 обратных вызовов, попарно, в два отдельных сервиса. Все ответы должны быть агрегированы, прежде чем я смогу вернуть свой ответ клиенту. Я использую hystrix-feign для совершения HTTP-вызовов.Я злоупотребляю rxJava путем преобразования наблюдаемого в блокируемое наблюдаемое?

Я придумал, что я считал элегантным решением, пока на rxJava docs я нашел следующий

BlockingObservable является разновидностью Observable, что обеспечивает блокирование операторов. Это может быть полезно для тестирования и демонстрационных целей, но, как правило, не подходит для производственных приложений (если вы считаете, что вам нужно использовать BlockingObservable, это обычно является признаком того, что вы должны переосмыслить свой дизайн).

Мой код выглядит примерно следующим образом

List<Observable<C>> observables = new ArrayList<>(); 
for (RequestPair request : requests) { 
    Observable<C> zipped = Observable.zip(
     feignClientA.sendRequest(request.A()), 
     feignClientB.sendRequest(request.B()), 
     (a, b) -> new C(a,b)); 
    observables.add(zipped); 
} 

Collection<D> apiResponse = = new ConcurrentLinkedQueue<>(); 

Observable 
    .merge(observables) 
    .toBlocking() 
    .forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse))); 

return apiResponse; 

Несколько вопросов, основанных на этой установке:

  1. Is toBlocking() обоснованным, учитывая мой случай использования
  2. Я правильно в понимании, что фактические HTTP-вызовы не производятся до тех пор, пока основной поток не попадет в forEach()
  3. Я видел, что код в блоке forEach() выполняются разными потоками, но я не смог проверить, может ли быть больше одного потока в блоке forEach(). Выполняется ли одновременное выполнение?

ответ

1

Лучшим вариантом является вернуть Observable потребляться другими операторами, но вы можете уйти с блокировкой кода (Следует, однако, работать в фоновом потоке.)

public Observable<D> getAll(Iterable<RequestPair> requests) { 
    return Observable.from(requests) 
    .flatMap(request -> 
     Observable.zip(
      feignClientA.sendRequest(request.A()), 
      feignClientB.sendRequest(request.B()), 
      (a, b) -> new C(a,b) 
     ) 
    , 8) // maximum concurrent HTTP requests 
    .map(both -> doSomeWork(both)); 
} 

// for legacy users of the API 
public Collection<D> getAllBlocking(Iterable<RequestPair> requests) { 
    return getAll(requests) 
     .toList() 
     .toBlocking() 
     .first(); 
} 

Я правильно в понимании того, что реальные вызовы HTTP не получают сделано, пока основной поток не попадает в Foreach()

Да, forEach запускает всю последовательность Operati дополнения.

Я видел, что код в блоке forEach() выполняется разными потоками, но я не смог проверить, может ли быть больше одного потока в блоке forEach(). Выполняется ли одновременное выполнение?

Только один поток за один раз разрешен для выполнения лямбда в forEach, но вы действительно можете увидеть различные темы, входящие туда.