2016-10-19 1 views
0

У меня есть подключаемый наблюдатель с несколькими подписчиками.Подключаемый наблюдатель: соблюдайте прекращение всех подписчиков

Каждый абонент вычисляет некоторую бизнес-логику. Например, один из подписчиков хранит результаты в базе данных при каждом вызове onNext, другой абонент накапливает его результаты в памяти и когда onCompleted называется, записывает их в файл. Я хочу знать, когда все они закончили свою работу, поэтому я могу продолжать делать другие вещи (объединяться с другими подключаемыми наблюдателями, читать полученные данные из базы данных и т. Д.).

Вот как я наблюдаю за прекращением действия. Он работает только потому, что подписчики выполняются в том же потоке, что и наблюдатель.

public Observable<Boolean> observeTermination() { 
    return Observable.defer(() -> { 
     try { 
      start(); 
      return Observable.just(true); 
     } catch (RuntimeException e) { 
      return Observable.just(false); 
     } 
    }); 
} 

void start() { 
    Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE); 

    ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish(); 
    subscribers.forEach(s -> connectableObservable.subscribe(s)); 

    connectableObservable.connect(); 
} 

Так что, когда observeTermination вызывается я не хочу, чтобы выполнить логику в start метод, но только тогда, когда кто-то присоединяется к нему. Есть ли способ сделать наблюдение лучше?

Хорошо, все плохо. Проблема в том, что мне нужно называть connect на наблюдаемом месте, а также возвращать boolean результаты как указание прекращения.

ответ

0

Не правильный ответ, но ему необходимо пространство для правильного объяснения. Было бы намного проще, если бы вы могли общаться с Observables вместо подписчиков; что дает вам большую гибкость при составлении их;

Учитывая, что у вас есть components:

Collection<Function<Observable<T>, Observable<?>>> components: 
Observable<T> tObs = ... .publish().autoConnect(components.size()); 
Observable 
.from(components) 
.flatMap(component -> component.apply(tObs)) 
.ignoreElements() 
.doOnTerminate() // or .defaultIfEmpty(...), or .switchIfEmpty(...) 
.subscribe(...); 

На самом деле, я бы сказал, что вы не должны даже подписаться на все здесь, просто создать наблюдаемым и вернуть его, пусть он будет использоваться для композиции в других частях вашего кода.

+0

Я думал об изменении подписчиков на простые функции, которые будут вызываться для одних и тех же данных, но это ограничивает их использование только 'onNext'. Таким образом, нет возможности выполнить логику абонента, которая оставлена ​​в 'onStart',' onCompleted', 'onError' – marknorkin

+0

Возможно, я могу наблюдать' Subsription' и метод 'isUnsubscribe'? есть ли способ сделать это? редактирование: но проблема с методом 'connect' – marknorkin

+0

по-прежнему существует. Если вы выполняете функции подписчиков, которые принимают наблюдаемые и генерируют другую наблюдаемую цепочку, основанную на вводе, вы можете обрабатывать ошибки, onNext, запускать и завершать в них. –