2017-02-21 33 views
1

Я очень новичок в RxJS, и вот моя история. Я хочу, чтобы «сеанс» наблюдался, поэтому новые подписчики всегда будут получать текущий сеанс, а затем все новые сеансы, если они могут появиться. Так что я написал что-то вроде этого:Как заставить publishReplay() пересылать подписку?

var session = Rx.Observable.from([0,1,3]) 
    .do(x => console.log("Useful job")) 
    .publishReplay(1) 
    .refCount(); 

var subscr1 = session.subscribe(x => { 
    console.log("sub1 = " +x) 
    //subscr1.unsubscribe(); 
}) 

console.log("Completed"); 
subscr1.unsubscribe(); 

session.subscribe(x => { 
    console.log("sub2 = " +x) 
}); 

И выход:

Useful job 
sub1 = 0 
Useful job 
sub1 = 1 
Useful job 
sub1 = 3 
Completed 
sub2 = 3 

Почему нет никакой полезной работы, когда SUB2 выписывает? Я ожидаю, что будет совсем холодно!

ответ

0

Первый параметр .publishReplay() это количество элементов, которые он будет переигрывать, чтобы вы могли использовать .publishReplay(3), если вы знаете, что вы всегда можете рассчитывать только 3.

Если вы хотите, чтобы воспроизвести всю последовательность можно сначала собрать все его элементы с toArray(), сохраните массив в ReplaySubject, а затем просто сгладьте массив до отдельных значений.

var session = Rx.Observable.from([0,1,3]) 
    .toArray() 
    .publishReplay(1) 
    .refCount() 
    .concatAll(); 

Обратите внимание, что toArray() ничего не излучает, пока его источник не завершится.

EDIT: Я вижу, в чем проблема сейчас.

Ваш подход правильный, но проблема заключается в Observable.from и предмете, который используется внутри publishReplay(). Источник Observable испускает три элемента и отправляет уведомление complete. Когда Субъект получает уведомление или error, он отмечает себя как stopped и больше не будет воспроизводить какие-либо предметы. И это именно то, что происходит в вашем примере.

Если вы испускаете значения вручную без отправки complete, он будет работать так, как вы ожидали.

// var session = Rx.Observable.from([0,1,3]) 
var session = Rx.Observable.create(subscriber => { 
    subscriber.next(0); 
    subscriber.next(1); 
    subscriber.next(3); 
    }) 
    .do(x => console.log("Useful job")) 
    .publishReplay(1) 
    .refCount(); 

Это выводит на консоль:

Useful job 
sub1 = 0 
Useful job 
sub1 = 1 
Useful job 
sub1 = 3 
Completed 
sub2 = 3 
Useful job 
sub2 = 0 
Useful job 
sub2 = 1 
Useful job 
sub2 = 3 

Смотрите та же проблема: Rx.Subject loses events

+0

Извините, это был всего лишь образец. Исходный «сеанс» на самом деле довольно сложный. Он подключается к EventSource, делает много вещей. И затем испускает объект «сеанс». В какой-то момент он может повторно подключиться и испустить объект «новая сессия». Обычно он заканчивается только «событием выхода». Теперь в следующем событии «Логин» я хочу, чтобы он повторил сеанс, наблюдаемый при повторном подключении. И это не так. – norekhov

+0

@norekhov Можете ли вы сделать демо-симуляцию этой проблемы? Я, вероятно, не понимаю, что такое порядок событий и что такое 'session'. – martin

+0

Imagine Rx.Observable.from ([0,1,3]) является наблюдаемым, который редко испускает новые сеансы. В какой-то момент сеанс «3» становится неактивным. Второй наблюдаемый получает «3», в то время как я хочу, чтобы сеанс наблюдался для перезапуска. – norekhov

1

Наконец я нашел решение для этого.

Заменить .publishReplay (1) с .multicast (() => новый Rx.ReplaySubject (1)). Поэтому в основном я заменяю тему субъектом.

И тогда он работает так, как ожидалось. Поэтому становится абсолютно холодно, когда все отписываются.

+0

Просто имейте в виду, что теперь каждый абонент имеет свой собственный экземпляр 'ReplaySubject' (' ReplaySubject' не распространяется среди всех подписчиков, как с '.publishReplay (1)'. – martin