Вам нужен однопоточный планировщик для этого, чтобы работать, и вы можете получить через Schedulers.from(Executor)
или новый SingleScheduler
(предупреждение: внутренний).
Проблема с unsubscribeOn
заключается в том, что она работает только в том случае, если нисходящий поток фактически располагает/отменяет поток (в отличие от 1.x, где это происходит почти все время).
Лучший способ заключается в использовании using
с вышеупомянутым настраиваемым планировщиком и вручную назначить очистку:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
Но обратите внимание, что если create
логика не сдастся планировщик (путем прекращения или собираются асинхры), отмена логика может никогда не выполняться из-за того же самого пула livelock.
Вы пробовали наблюдать за оператором до этого setDisposable? – paul
@paul Я думаю, вы неправильно поняли мой вопрос. Im заинтересован только в объединении планировщика, используемого между unsubscribeOn и subscribeOn. – zoltish