Я запускаю RxJava и создаю тему для использования onNext()
метод для создания данных. Я использую Весна.Запуск PublishSubject на другую тему rxJava
Это моя установка:
@Component
public class SubjectObserver {
private SerializedSubject<SomeObj, SomeObj> safeSource;
public SubjectObserver() {
safeSource = PublishSubject.<SomeObj>create().toSerialized();
**safeSource.subscribeOn(<my taskthreadExecutor>);**
**safeSource.observeOn(<my taskthreadExecutor>);**
safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() {
@Override
public void onNext(AsyncRemoteRequest asyncRemoteRequest) {
LOGGER.debug("{} invoked.", Thread.currentThread().getName());
doSomething();
}
}
}
public void publish(SomeObj myObj) {
safeSource.onNext(myObj);
}
}
Путь новых данных генерируется на поток RxJava является @Autowire private SubjectObserver subjectObserver
и затем вызывая subjectObserver.publish(newDataObjGenerated)
Независимо от того, что я указываю для subscribeOn()
& observeOn()
:
- Планировщики.io()
- Schedulers.computation()
- мои темы
- Schedulers.newThread
onNext()
и фактическая работа внутри него делается на том же потоке, что на самом деле называет onNext()
по этому вопросу, чтобы генерировать/продукты данные.
Это правильно? Если да, то что мне не хватает? Я ожидал, что doSomething()
будет выполнен в другом потоке.
Update
В моем классе вызывающего, если изменить способ я вызывающий метод publish
, то, конечно, новый поток распределяется для абонента для запуска на.
taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj));
Спасибо,
То, что вы испытываете, предназначено дизайном. Если вы вызываете onNext из threadX, следующий оператор в цепочке будет вызываться одним и тем же потоком. subscribeOn не поможет вам, потому что вы не можете манипулировать, откуда будет вызываться onNext, вы можете поддерживать контракт только с помощью toSerialized. Я бы предположил, что вы проверили «событие» с помощью onNext-operator, затем используйте функцию наблюдения, а после изменения потока вы можете использовать flatMap для вызова doSomething(). В обратном вызове subscribe вы должны установить свой пользовательский интерфейс или что бы вы ни делали с результатом. –