2016-10-25 4 views
0

Я запускаю RxJava и создаю тему для использования onNext() метод для создания данных. Я использую Весна.Запуск PublishSubject на другую тему rxJava

Это моя установка:

@Component 
public class SubjectObserver { 
    private SerializedSubject<SomeObj, SomeObj> safeSource; 
    public SubjectObserver() { 
     safeSource = PublishSubject.<SomeObj>create().toSerialized(); 
     **safeSource.subscribeOn(<my taskthreadExecutor>);** 
     **safeSource.observeOn(<my taskthreadExecutor>);** 
     safeSource.subscribe(new Subscriber<AsyncRemoteRequest>() { 
      @Override 
      public void onNext(AsyncRemoteRequest asyncRemoteRequest) { 
      LOGGER.debug("{} invoked.", Thread.currentThread().getName()); 
      doSomething(); 
      } 
     } 
    } 
    public void publish(SomeObj myObj) { 
     safeSource.onNext(myObj); 
    } 
} 

Путь новых данных генерируется на поток RxJava является @Autowire private SubjectObserver subjectObserver и затем вызывая subjectObserver.publish(newDataObjGenerated)

Независимо от того, что я указываю для subscribeOn() & observeOn():

  • Планировщики.io()
  • Schedulers.computation()
  • мои темы
  • Schedulers.newThread

onNext() и фактическая работа внутри него делается на том же потоке, что на самом деле называет onNext() по этому вопросу, чтобы генерировать/продукты данные.

Это правильно? Если да, то что мне не хватает? Я ожидал, что doSomething() будет выполнен в другом потоке.

Update

В моем классе вызывающего, если изменить способ я вызывающий метод publish, то, конечно, новый поток распределяется для абонента для запуска на.

taskExecutor.execute(() -> subjectObserver.publish(newlyGeneratedObj)); 

Спасибо,

+0

То, что вы испытываете, предназначено дизайном. Если вы вызываете onNext из threadX, следующий оператор в цепочке будет вызываться одним и тем же потоком. subscribeOn не поможет вам, потому что вы не можете манипулировать, откуда будет вызываться onNext, вы можете поддерживать контракт только с помощью toSerialized. Я бы предположил, что вы проверили «событие» с помощью onNext-operator, затем используйте функцию наблюдения, а после изменения потока вы можете использовать flatMap для вызова doSomething(). В обратном вызове subscribe вы должны установить свой пользовательский интерфейс или что бы вы ни делали с результатом. –

ответ

3

Каждый оператор на Observable/Subject возвращает новый экземпляр с дополнительным поведением, однако, ваш код просто применяет subscribeOn и observeOn затем выбрасывает то, что они производят, и присоединяются к сырым Subject , Вы должны приковать вызовы методов, а затем подписаться:

safeSource = PublishSubject.<AsyncRemoteRequest>create().toSerialized(); 

safeSource 
.subscribeOn(<my taskthreadExecutor>) 
.observeOn(<my taskthreadExecutor>) 
.subscribe(new Subscriber<AsyncRemoteRequest>() { 
    @Override 
    public void onNext(AsyncRemoteRequest asyncRemoteRequest) { 
     LOGGER.debug("{} invoked.", Thread.currentThread().getName()); 
     doSomething(); 
    } 
}); 

Обратите внимание, что subscribeOn не имеет никакого практического эффекта на PublishSubject, потому что нет подписки побочного эффекта происходит в методе subscribe().

+0

гениальный! большое спасибо, просто упустил этот важный момент. благодаря! – user3169330

 Смежные вопросы

  • Нет связанных вопросов^_^