2016-02-01 3 views
19

Его не на 100% ясно для меня, как работает оператор RxJs 5 share(), см. Здесь latest docs. Jsbin для вопроса here.Как работает оператор RxJs 5 share()?

Если я создаю наблюдаемый с серией 0 до 2, каждого значения, разделенного одним вторых:

var source = Rx.Observable.interval(1000) 
.take(5) 
.do(function (x) { 
    console.log('some side effect'); 
}); 

И если я создаю два абонентов эту наблюдаемый:

source.subscribe((n) => console.log("subscriptor 1 = " + n)); 
source.subscribe((n) => console.log("subscriptor 2 = " + n)); 

Я получаю это в консоли:

"some side effect ..." 
"subscriptor 1 = 0" 
"some side effect ..." 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"some side effect ..." 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"some side effect ..." 
"subscriptor 2 = 2" 

Я думал, что каждая подписка будет подписана на тот же Наблюдаемый, но, похоже, это не так! Его, как акт подписки, создает совершенно отдельный Наблюдаемый!

Но если оператор share() добавляется к источнику наблюдаемом:

var source = Rx.Observable.interval(1000) 
.take(3) 
.do(function (x) { 
    console.log('some side effect ...'); 
}) 
.share(); 

Тогда мы получим это:

"some side effect ..." 
"subscriptor 1 = 0" 
"subscriptor 2 = 0" 
"some side effect ..." 
"subscriptor 1 = 1" 
"subscriptor 2 = 1" 
"some side effect ..." 
"subscriptor 1 = 2" 
"subscriptor 2 = 2" 

Что я и следовало ожидать без share().

Что здесь происходит, как работает оператор share()? Создает ли каждая подписка новую цепочку Observable?

ответ

15

Будьте осторожны, что вы используете RxJS v5, в то время как ваша ссылка для документации выглядит RxJS v4. Я не помню специфику, но я думаю, что оператор share прошел некоторые изменения, в частности, когда дело доходит до завершения и повторной подписки, но не задумывайтесь об этом.

На ваш вопрос, как вы показали в своем исследовании, ваши ожидания не соответствуют дизайну библиотеки. Наблюдаемые лениво создают поток данных, конкретно инициируя поток данных, когда абонент подписывается. Когда второй абонент подписывается на одно и то же наблюдаемое, запускается другой новый поток данных, как если бы он был первым абонентом (так что да, каждая подписка создает новую цепочку наблюдаемых, как вы сказали). Это то, что придумано в терминологии RxJS как наблюдаемое в холодном состоянии, и это поведение по умолчанию для наблюдаемого RxJS. Если вы хотите, чтобы наблюдаемый, который отправляет свои данные подписчикам, которые он имеет в момент поступления данных, это придумано как горячее наблюдаемое, и одним из способов получить горячее наблюдение является использование оператора share.

Вы можете найти иллюстрированные потоки подписки и данных здесь: Hot and Cold observables : are there 'hot' and 'cold' operators? (это справедливо для RxJS v4, но большая часть из них действительна для v5).

10

доля составляет наблюдаемая "горячая", если эти 2 условия:

  1. количество абонентов> 0
  2. И наблюдаемая не законченных

Scenario1: количество абонентов > 0 и наблюдаемый не завершен до новой подписки

var shared = rx.Observable.interval(5000).take(2).share(); 
var startTime = Date.now(); 
var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 3000); 

// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds 
// another emission for both observers at: startTime + 10 seconds 

Сценарий 2: количество подписчиков равно нулю перед новой подпиской. Становится «холодным»

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
    console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
}; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer1.unsubscribe(); 
}, 1000); 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); // number of subscribers is 0 at this time 
}, 3000); 
// observer2's onNext is called at startTime + 8 seconds 
// observer2's onNext is called at startTime + 13 seconds 

Сценарий 3: когда наблюдаемый был завершен до новой подписки. Становится «холодным»

var shared = rx.Observable.interval(5000).take(2).share(); 
    var startTime = Date.now(); 
    var log = (x) => (value) => { 
     console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`); 
    }; 

var observer1 = shared.subscribe(log('observer1')), 
    observer2; 

setTimeout(()=>{ 
    observer2 = shared.subscribe(log('observer2')); 
}, 12000); 

// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs 
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs