2017-02-10 15 views
0

Я использую rxjava для параллельной обработки двух запросов с использованием Observable.zip. То, что я пытаюсь сделать, в одном observable say response Я получаю один ответ, а в другом observable say diff Я пытаюсь получить ответ и сохранить эту разницу в БД. Проблема в том, что я не знаю, как достичь мои требования как diff observable не получает завершенным, если response observable получает ответrxjava - получение ответа и вставка diff параллельно

Вот что я делаю ...

public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){ 
    Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable(); 
    Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId); 

    Observable<ServiceResponse> responseObservable = Observable.zip(
      subInfoDummyObservable, 
      reObservable, 
      new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() { 
       @Override 
       public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) { 
        return serviceResponse; 
       } 
      } 
    ); 

    ServiceResponse serviceResponse = responseObservable.toBlocking().single(); 

    return serviceResponse; 
} 

Observable<ServiceResponse> getDummyResonseGenericObservable() { 
    return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable",() -> new ServiceResponse(),(t) -> {return null;}); 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable",() -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;}); 
} 

public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    Observable<ServicesDiff> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); // never reaches this point********** 
       } 
      } 
    ); 
    ServicesDiff response = observable.toBlocking().single(); 

    return response; 
} 

Я вставив диф в DB в методе aggregate, но он никогда не достигает aggregate. Пожалуйста, дайте мне знать, что я делаю неправильно здесь? Благодарю.

+0

ваш образец кода не ясно мне, что связь между методом getBothServiceResponses, где находится проблема, с остальной частью кода? Какие две наблюдаемые вы зажимаете в getBothServiceResponses? – yosriz

+0

Согласен, код нуждается в уточнении. первые три метода никогда не вызываются, поэтому мы не знаем, как выглядят наблюдаемые в 4-м методе 'getBothServiceResponses()', где ваша проблема лежит – nosyjoe

ответ

0

Observable - описание способов использования данных. В вашем примере кода вы не подписываетесь, вы фактически не потребляете данные. Вы просто описали, как запросить, но часть подписки, часть, которая запускает запросы, отсутствует.

Так что, если я переписать немного код:

class Aggregate { 
    Aggregate(String reponse, ServicesDiff diff) { 
     ... 
    } 
} 

Observable<String> getService1GenericObservable(String prodId) { 
    ... 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    ... 
} 

public Observable<Aggregate> getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    return Observable<Aggregate> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); 
       } 
      } 
    ); 
} 

Вы просто должны это сделать, чтобы получить доступ результата агрегата:

getBothServiceResponses(serviceRequest, prodId).subscribe(...)