2016-07-27 8 views
1

Я хочу асинхронно извлекать данные через несколько REST API. Я использую Retrofit на Android с расширением rxJava, т. Е. Я выполняю любой запрос GET, подписываясь на Observable.Условная цепочка наблюдаемых

Как я уже говорил, у меня есть несколько исходных API, поэтому, когда первый источник не дает желаемого результата, я хочу попробовать следующее, если это тоже не удастся, снова попробуйте следующее и так далее, пока все источники не будут были запрошены или результат был найден.

Я изо всех сил, чтобы перевести этот подход в надлежащее использование наблюдаемых, так как я не знаю, какие операторы могут достичь такого поведения, и есть также некоторые ограничения в честь:

  • когда результат был найден , остальные API-интерфейсы, если таковые имеются, не должны запрашиваться
  • другие компоненты зависят от результата запроса, я хочу, чтобы они получили Observable при запуске запроса, поэтому этот Observable может уведомить их о завершении запроса
  • Мне нужно сохранить ссылку на вышеупомянутое Наблюдаемое, потому что тот же запрос c возможно, сделать это несколько раз, прежде чем он закончится, в этом случае я только начинаю его в первый раз, когда он нужен, и последующие запросы получают только Observable, который уведомляет, когда запрос завершен.

Я начинал с только один API для запроса и используется следующее для запроса и последующего уведомления зависимых компонентов:

private Observable<String> loadData(int jobId) { 

    final ConnectableObservable<String> result = Async 
      .fromCallable(() -> getResult(jobId)) 
      .publish(); 

    getRestRequest() 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(
        dataHolder -> { 
         if (dataHolder.getData() != null && !dataHolder.getData().isEmpty()) { 
          saveData(dataHolder.getData()); 
         } else { 
          markNotFound(dataHolder); 
         } 
        }, 
        error -> currentJobs.remove(jobId), 
        () -> { 
         currentJobs.remove(jobId); 
         result.connect(); 
        }); 

    return result; 
} 

Этот код был вызван только для первого запроса, возвращаемый Наблюдаемый результат затем будет сохранен в currentJobs и последующие запросы будут только получить Observable без повторного запроса запроса.

Любая помощь очень ценится.

ответ

0

Предполагая, что вы набор наблюдаемых, повторно подключить каждый раз, когда вы подписываться:

List<Observable<Result>> suppliers = ... 

Тогда вам просто нужно сделать логическую вещь:

Observable<Result> results = Observable 
      .from(suppliers) 
      .concatMap(supplier -> supplier) 
      .takeFirst(result -> isAcceptable(result)) 
      .cache() 
+0

Я думаю, что это путь. Я нашел аналогичный подход [здесь] (http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/). Есть ли какая-либо функциональная разница между concat() и from()? В связанном блоге они подчеркивают, что с concat() подписка выполняется только тогда, когда это необходимо, применимо ли это для from()? Какую цель выполняет concatMap()? – DerBob

+0

'from' просто создает наблюдаемый из определенного источника. 'concat' берет несколько Observables и начинает получать предметы из следующего, только когда предыдущий закончен. 'concatMap' идет немного дальше (не точно равно concat + map). –

0

Использование .onErrorResumeNext, и при условии, что каждая наблюдаемая услуга может возвращать 0 или 1 элементы, используя first, для испускания ошибки, если не испускаются элементы:

Observable<T> a, b, c; 
... 
a.first().onErrorResumeNext(t -> b.first()) 
.onErrorResumeNext(t -> c.first()) 
.onErrorResumeNext(t -> d.first()) 
... 
+0

Когда один из запросов заканчивается без поиска запрашиваемых данных, он возвращается с помощью emtpy dataHolder. Это не вызывает ошибки, поэтому такой подход не будет работать по желанию. – DerBob

+0

Я обновляю ответ, используя 'first' для обработки вашего дополнительного требования –