У меня есть подключаемый наблюдатель с несколькими подписчиками.Подключаемый наблюдатель: соблюдайте прекращение всех подписчиков
Каждый абонент вычисляет некоторую бизнес-логику. Например, один из подписчиков хранит результаты в базе данных при каждом вызове onNext
, другой абонент накапливает его результаты в памяти и когда onCompleted
называется, записывает их в файл. Я хочу знать, когда все они закончили свою работу, поэтому я могу продолжать делать другие вещи (объединяться с другими подключаемыми наблюдателями, читать полученные данные из базы данных и т. Д.).
Вот как я наблюдаю за прекращением действия. Он работает только потому, что подписчики выполняются в том же потоке, что и наблюдатель.
public Observable<Boolean> observeTermination() {
return Observable.defer(() -> {
try {
start();
return Observable.just(true);
} catch (RuntimeException e) {
return Observable.just(false);
}
});
}
void start() {
Observable<List<Foo>> fooBatchReaderObservable = fooBatchReader.createObservable(BATCH_SIZE);
ConnectableObservable<List<Foo>> connectableObservable = fooBatchReaderObservable.publish();
subscribers.forEach(s -> connectableObservable.subscribe(s));
connectableObservable.connect();
}
Так что, когда observeTermination
вызывается я не хочу, чтобы выполнить логику в start
метод, но только тогда, когда кто-то присоединяется к нему. Есть ли способ сделать наблюдение лучше?
Хорошо, все плохо. Проблема в том, что мне нужно называть connect
на наблюдаемом месте, а также возвращать boolean
результаты как указание прекращения.
Я думал об изменении подписчиков на простые функции, которые будут вызываться для одних и тех же данных, но это ограничивает их использование только 'onNext'. Таким образом, нет возможности выполнить логику абонента, которая оставлена в 'onStart',' onCompleted', 'onError' – marknorkin
Возможно, я могу наблюдать' Subsription' и метод 'isUnsubscribe'? есть ли способ сделать это? редактирование: но проблема с методом 'connect' – marknorkin
по-прежнему существует. Если вы выполняете функции подписчиков, которые принимают наблюдаемые и генерируют другую наблюдаемую цепочку, основанную на вводе, вы можете обрабатывать ошибки, onNext, запускать и завершать в них. –