2016-11-08 5 views
0

Я ищу, чтобы создать класс LocationHandler, который возвращает observable<Location>, я могу отправить новое местоположение и подписчики получить последнее добавленное и любые последующие значения.RX Java 2, Observable, который принимает новые значения, которые необходимо добавить

Я написал этот класс, он работает, но я не знаю, правильно ли это сделать, потому что я добавил обратный вызов, и я плохо его чувствую.

Спасибо за любую помощь.

public class LocationHandler { 
    private MessageHandler<Location> onNewItem; 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = getHookedObservable() 
       .mergeWith(locationInitBuilder.build()) 
       .replay(1).autoConnect(); 
    } 


    private Observable<Location> getHookedObservable() { 
     return Observable.create(new ObservableOnSubscribe<Location>() { 
      @Override 
      public void subscribe(ObservableEmitter<Location> e) throws Exception { 
       onNewItem = location -> e.onNext(location); 
      } 
     }); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ // <---------- add new values 
     if (onNewItem != null){ 
      onNewItem.handleMessage(address); 
     } else { 
      throw new IllegalStateException("Cannot add an item to a never subscribed stream"); 
     } 
    } 
} 

После @Blackbelt совет, который я изменил его с ReplaySubject.

public class LocationHandler { 
    private ReplaySubject<Location> inputStream = ReplaySubject.create(1); 
    private Observable<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     locationObservable = locationInitBuilder.build() 
       .mergeWith(inputStream) 
       .replay(1).autoConnect(); 
    } 

    public Observable<Location> getLocation(){ 
     return locationObservable; 
    } 

    public void setLocation(Location address){ 
     inputStream.onNext(address); 
    } 
} 

ответ

2

вы могли бы использовать Subject вместо MessageHandler. Субъект может действовать как наблюдаемый и подписчик одновременно. У вас может быть метод в вашем LocationHandler, который возвращает Subject#asObservable, на который вы подписаны. Внутри, когда setLocation, вам придется вызывать Subject#onNext, предоставляя это место. Существуют различные типы Субъектов. Пожалуйста, обратитесь к документации, чтобы выбрать тот, который лучше подходит для ваших нужд. Например.

public class LocationHandler { 
    BehaviorSubject<GeevLocation> mLocationSubject = BehaviorSubject.create(); 

    public Observable<GeevLocation> getLocation() { 
     return mLocationSubject.asObservable(); 
    } 

    public void setLocation(GeevLocation address){ 
     mLocationSubject.onNext(address); 
    } 
} 

от внешнего вызова getLocation и подписываются на возвращаемом Observable. Когда вызывается setLocation, вы получите объект onNext

1

Как уже сказал вам Блэкбелт, вы должны использовать тему. В частности, я бы использовал BehaviorSubject. По умолчанию темы горячие, но они могут воспроизводить события по подписке. BehaviorSubject даст вам последнее испущенное значение или значение init, если вы подписаны. Каждый абонент получит значения как входящие. Поток никогда не закончится, потому что он горячий. Пожалуйста, помните, чтобы обрабатывать ошибки, потому что второй onError будет проглочен.

Пример-код

class Location { 

} 

class LocationInitializationBuilder { 
    static Location build() { 
     return new Location(); 
    } 
} 

class LocationHandler { 
    private Subject<Location> locationObservable; 

    public LocationHandler(LocationInitializationBuilder locationInitBuilder) { 
     Location initialValue = LocationInitializationBuilder.build(); 

     locationObservable = BehaviorSubject.<Location>createDefault(initialValue).toSerialized(); 
    } 

    public Observable<Location> getLocation() { 
     return locationObservable.hide(); 
    } 

    public void setLocation(Location address) { // <---------- add new values 
     locationObservable.onNext(address); 
    } 
} 

public class LocationTest { 
    @Test 
    public void name() throws Exception { 
     LocationHandler locationHandler = new LocationHandler(new LocationInitializationBuilder()); 

     TestObserver<Location> test = locationHandler.getLocation().test(); 

     locationHandler.setLocation(new Location()); 

     test.assertValueCount(2); 
    } 
} 
+0

На самом деле я не могу использовать поведение, потому что это поток я получаю через LocationInitializationBuilder.build. Поведение требует определенной ценности, которую я не могу обеспечить при создании. –

+0

Да, я вижу, где проблема. Ваше решение выглядит законным. –

+1

'BehaviourSubject' имеет статический метод' create', который создает пустой 'BehaviourSubject' – Blackbelt