2017-02-12 13 views
0

Проблема, с которой я столкнулся, выглядит следующим образом: У меня есть два наблюдаемых, которые извлекают данные из сети, а другие из db. Второй может быть пустым, но отсутствие первого считается ошибкой. Затем, если результат получается из сети, мне нужно сравнить его с последними результатами из db (если они есть), и если они отличаются, я хочу их хранить (если db наблюдаемый пуст, я все равно хочу сохранить результаты сети).Как фильтровать наблюдаемые данные с другими наблюдаемыми объектами

Есть ли специальный оператор, который обрабатывает такой случай?

До сих пор я пробовал решение с zipWith (который работает не так, как ожидалось, если db пуст), буфер (который работает, но далек от идеала), , и я также попробовал flatmapping (что требует дополнительной кастинга в абонент).

Ниже приведено решение с буфером.

Observable.concat(ratesFromNetwork(), latestRatesFromDB()) 
       .buffer(3000, 2) 
       .filter(buffer -> !(buffer.size() == 2 && !buffer.get(0).differentThan(buffer.get(1)))) 
       .map(buffer -> buffer.get(0)) 
       .subscribe(this::save, 
         (ex) -> System.out.println(ex.getMessage()), 
         () -> System.out.println("completed")); 

Если я изменяю latestRatesFromDb так, чтобы он не возвращался Observable, но и дополнительные вместо этого вся проблема становится тривиальным, потому что я могу фильтровать с помощью этого результата. Это швы, что нет возможности фильтровать асинхронным способом (или я что-то пропустил?)

+0

Что излучают эти наблюдаемые? только один элемент или несколько? – flakes

+0

Только одно событие. – Luke

ответ

1

Хорошо, вот как я хотел бы это написать.

Во-первых, любой класс имеет функцию differentThan, вместо этого следует заменить equals. В противном случае с этими объектами нельзя использовать множество базовых методов.

Для целей этого примера я написал все наблюдаемые, используя класс Integer в качестве параметра моего типа. Затем я использую планировщик написать две фиктивные методы:

static Observable<Integer> ratesFromNetwork(Scheduler scheduler) { 
    return Observable.<Integer>create(sub -> { 
     sub.onNext(2); 
     sub.onCompleted(); 
    }).delay(99, TimeUnit.MILLISECONDS, scheduler); 
} 

static Observable<Integer> latestRatesFromDB(Scheduler scheduler) { 
    return Observable.<Integer>create(sub -> { 
     sub.onNext(1); 
     sub.onCompleted(); 
    }).delay(99, TimeUnit.MILLISECONDS, scheduler); 
} 

Как вы можете видеть оба похожи, однако, они будут излучать различные значения.

отсутствия первого считаются ошибкой

Лучшим способом для достижения этой цели является использование timeout. Вы можете зарегистрировать ошибку сразу здесь и продолжить:

final Observable<Integer> networkRate = ratesFromNetwork(scheduler) 
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from network.")); 

Когда timeout терпит неудачу ошибка будет брошена гм. doOnError даст вам лучшее представление о том, где эта ошибка началась и пусть она распространяется через остальную часть последовательности.

Второй может быть пустым

В этом случае я хотел бы сделать подобную стратегию, однако, не позволяют ошибки распространяться с помощью метода onErrorResumeNext. Теперь вы можете убедиться, что наблюдаемый испускает хотя бы одно значение, используя firstOrDefault. В этом методе используйте некоторое фиктивное значение, которое вы ожидаете никогда не соответствовать результатам сети.

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler) 
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from database")) 
    .onErrorResumeNext(Observable.empty()) 
    .firstOrDefault(-1); 

Теперь с помощью метода distinct вы можете получить значение только тогда, когда оно отличается от того, что было до него (именно поэтому вам нужно переопределить equals).

databaseRate.concatWith(networkRate).distinct().skip(1) 
    .subscribe(i -> System.out.println("Updating to " + i), 
     System.err::println, 
     () -> System.out.println("completed")); 

Здесь скорость базы данных была помещена перед скорость сети, чтобы воспользоваться distinct. затем добавляется skip, чтобы всегда игнорировать значение скорости базы данных.


Полный код:

final long networkTimeOut = 100; 
final long databaseTimeOut = 100; 

final TestScheduler scheduler = new TestScheduler(); 

final Observable<Integer> networkRate = ratesFromNetwork(scheduler) 
    .timeout(networkTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from network.")); 

final Observable<Integer> databaseRate = latestRatesFromDB(scheduler) 
    .timeout(databaseTimeOut, TimeUnit.MILLISECONDS, scheduler) 
    .doOnError(e -> System.err.println("Failed to get rates from database")) 
    .onErrorResumeNext(Observable.empty()) 
    .firstOrDefault(-1); 

databaseRate.concatWith(networkRate).distinct().skip(1) 
    .subscribe(i -> System.out.println("Updating to " + i), 
     System.err::println, 
     () -> System.out.println("completed")); 

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS); 

Когда networkTimeOut и databaseTimeOut больше, чем 100 печатает:

Updating to 2 
completed 

Когда networkTimeOut меньше 100 печатает:

Failed to get rates from network. 
java.util.concurrent.TimeoutException 

Когда databaseTimeOut менее 100 печатает:

Failed to get rates from database 
Updating to 2 
completed 

И если вы измените latestRatesFromDB и ratesFromNetwork вернуть то же значение, он просто печатает:

completed 

И если вам не нужны форсирование тайм-аутов или каротажа, то это сводится к:

latestRatesFromDB().firstOrDefault(dummyValue) 
    .concatWith(ratesFromNetwork()) 
    .distinct().skip(1) 
    .subscribe(this::save, 
     System.err::println, 
     () -> System.out.println("completed")); 
+0

Да, последний пример - это то, к чему я стремился. Превышение равных делает работу! Благодарю. – Luke