2016-09-28 4 views
0

я следующий код, который демонстрирует выпуск:Spring + RxJava + Расписание хрон: почему бобы называют только один раз соединяемого наблюдаемой

@Component 
public class App { 
    @Autowired S1 s1; 
    @Autowired S2 s2; 
    int jobs = 0; 
    @Scheduled(cron = "0 * * * * ?") 
    void foo() { 
     System.out.println("schedule cron job: " + jobs++); 
     Observable<String> observable = Observable.just("bar"); 
     ConnectableObservable<String> publishedObservable = observable.publish(); 
     publishedObservable.subscribe(s1); 
     publishedObservable.subscribe(s2); 
     publishedObservable.connect(); 
    } 
} 

Subscriber1:

@Component 
public class S1 extends Subscriber<String> { 
    private AtomicInteger counter = new AtomicInteger(0); 

    @Override 
    public void onCompleted() { 
    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(String s) { 
     System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s); 

    } 
} 

Subscriber2:

@Component 
public class S2 extends Subscriber<String> { 
    private AtomicInteger counter = new AtomicInteger(0); 

    @Override 
    public void onCompleted() { 
    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(String s) { 
     System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s); 
    } 
} 

Выход будет:

schedule cron job: 0 
S1:::: Times called: 0, input: bar 
S2:::: Times called: 0, input: bar 
schedule cron job: 1 
schedule cron job: 2 
schedule cron job: 3 
schedule cron job: 4 
...... 

Почему S1 и S2 не вызываются каждый раз, когда вызван метод foo? Как это достичь?

Это потому, что rx некоторая логика подписок или из-за этих бобах являются одноточечными?

ответ

2

Почему S1 и S2 не вызываются каждый раз, когда вызван метод foo?

RxJava Subscriber s не отслеживанием состояния, и как только они потребляли последовательность они больше не могут использоваться и сообщают о себе, как отписались. Подписка с ними снова не влияет. Вы должны повторно создавать их каждый раз, когда вам нужно подписаться на источник.

+0

Благодарим вас за разъяснение этого. Это будет вариант? 'publishedObservable.subscribe (s1 :: onNext, s1 :: onError, s1 :: onCompleted); publishedObservable.subscribe (s2 :: onNext, s2 :: onError, s2 :: onCompleted); 'или лучше использовать область прототипа? – marknorkin

+0

Используйте «Наблюдатель». – akarnokd

+0

'Наблюдатели' неудовлетворены и могут быть использованы повторно? у некоторых подписчиков я использую метод 'onStart', где эту логику можно перемещать? 'publishedObservable.subscribe (s1 :: onNext, s1 :: onError, s1 :: onCompleted);' поэтому для меня это не сработает. Думаю – marknorkin