2016-11-09 3 views
0

У меня есть Observable<String>, который выполняет некоторую работу. После того, как его сделать, он закрывает соединение (через .setDisposable(Disposables.fromAction(logic*);. Проблема, мне нужно это близко действие, которое будет выполняться на тот же планировщик, как фактическая нагрузка.RxJava 2: всегда отписаться от планировщика .subscribeOn (..)?

Возможно ли это? Как?

Я не хочу чтобы заставить Наблюдаемые выполнять на данном планировщиком, напримерmyObservable.subscribeOn(x).unsubscribeOn(x);;

+0

Вы пробовали наблюдать за оператором до этого setDisposable? – paul

+0

@paul Я думаю, вы неправильно поняли мой вопрос. Im заинтересован только в объединении планировщика, используемого между unsubscribeOn и subscribeOn. – zoltish

ответ

1

Вам нужен однопоточный планировщик для этого, чтобы работать, и вы можете получить через Schedulers.from(Executor) или новый SingleScheduler (предупреждение: внутренний).

Проблема с unsubscribeOn заключается в том, что она работает только в том случае, если нисходящий поток фактически располагает/отменяет поток (в отличие от 1.x, где это происходит почти все время).

Лучший способ заключается в использовании using с вышеупомянутым настраиваемым планировщиком и вручную назначить очистку:

Observable<T> createObservable(Scheduler scheduler) { 
    return Observable.create(s -> { 
     Resource res = ... 

     s.setCancellable(() -> 
      scheduler.scheduleDirect(() -> 
       res.close() // may need try-catch here 
      ) 
     ); 

     s.onNext(...); 
     s.onComplete(); 
    }).subscribeOn(scheduler); 
} 

Scheduler scheduler = new SingleScheduler(); 

createObservable(scheduler) 
.map(...) 
.filter(...) 
.subscribe(...); 

Но обратите внимание, что если create логика не сдастся планировщик (путем прекращения или собираются асинхры), отмена логика может никогда не выполняться из-за того же самого пула livelock.

+0

К сожалению, у меня нет доступа к планировщику - его доступно только в «родительском» наблюдаемом: например. Observable.combineLatest (obs1, obs2) .subscribeOn (...), где obs1 или obs2 будут наблюдаемыми в вашем ответе. – zoltish

+0

Затем вы должны изменить свой код, включая родителя. – akarnokd

+0

@akarnokd У меня есть тот же самый пул livelock, о котором вы упоминали: мой код заблокирован на sync 'input.readLine()' поэтому я не могу освободить поток, чтобы я мог использовать его для вызова отмены. Есть предположения? – ESala