Мой API составляет около 100 обратных вызовов, попарно, в два отдельных сервиса. Все ответы должны быть агрегированы, прежде чем я смогу вернуть свой ответ клиенту. Я использую hystrix-feign для совершения HTTP-вызовов.Я злоупотребляю rxJava путем преобразования наблюдаемого в блокируемое наблюдаемое?
Я придумал, что я считал элегантным решением, пока на rxJava docs я нашел следующий
BlockingObservable является разновидностью Observable, что обеспечивает блокирование операторов. Это может быть полезно для тестирования и демонстрационных целей, но, как правило, не подходит для производственных приложений (если вы считаете, что вам нужно использовать BlockingObservable, это обычно является признаком того, что вы должны переосмыслить свой дизайн).
Мой код выглядит примерно следующим образом
List<Observable<C>> observables = new ArrayList<>();
for (RequestPair request : requests) {
Observable<C> zipped = Observable.zip(
feignClientA.sendRequest(request.A()),
feignClientB.sendRequest(request.B()),
(a, b) -> new C(a,b));
observables.add(zipped);
}
Collection<D> apiResponse = = new ConcurrentLinkedQueue<>();
Observable
.merge(observables)
.toBlocking()
.forEach(combinedResponse -> apiResponse.add(doSomeWork(combinedResponse)));
return apiResponse;
Несколько вопросов, основанных на этой установке:
- Is toBlocking() обоснованным, учитывая мой случай использования
- Я правильно в понимании, что фактические HTTP-вызовы не производятся до тех пор, пока основной поток не попадет в forEach()
- Я видел, что код в блоке forEach() выполняются разными потоками, но я не смог проверить, может ли быть больше одного потока в блоке forEach(). Выполняется ли одновременное выполнение?