я следующий код, который демонстрирует выпуск:Spring + RxJava + Расписание хрон: почему бобы называют только один раз соединяемого наблюдаемой
@Component
public class App {
@Autowired S1 s1;
@Autowired S2 s2;
int jobs = 0;
@Scheduled(cron = "0 * * * * ?")
void foo() {
System.out.println("schedule cron job: " + jobs++);
Observable<String> observable = Observable.just("bar");
ConnectableObservable<String> publishedObservable = observable.publish();
publishedObservable.subscribe(s1);
publishedObservable.subscribe(s2);
publishedObservable.connect();
}
}
Subscriber1:
@Component
public class S1 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
Subscriber2:
@Component
public class S2 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
Выход будет:
schedule cron job: 0
S1:::: Times called: 0, input: bar
S2:::: Times called: 0, input: bar
schedule cron job: 1
schedule cron job: 2
schedule cron job: 3
schedule cron job: 4
......
Почему S1 и S2 не вызываются каждый раз, когда вызван метод foo? Как это достичь?
Это потому, что rx некоторая логика подписок или из-за этих бобах являются одноточечными?
Благодарим вас за разъяснение этого. Это будет вариант? 'publishedObservable.subscribe (s1 :: onNext, s1 :: onError, s1 :: onCompleted); publishedObservable.subscribe (s2 :: onNext, s2 :: onError, s2 :: onCompleted); 'или лучше использовать область прототипа? – marknorkin
Используйте «Наблюдатель». – akarnokd
'Наблюдатели' неудовлетворены и могут быть использованы повторно? у некоторых подписчиков я использую метод 'onStart', где эту логику можно перемещать? 'publishedObservable.subscribe (s1 :: onNext, s1 :: onError, s1 :: onCompleted);' поэтому для меня это не сработает. Думаю – marknorkin