2016-07-27 2 views
0

Есть два потока, которые никогда не завершают:Как получить события из одного потока, который произошел после последнего события из другого потока

--a---b-c-d---e--f----> 

-----1-------2------3--> 

Я хочу получать события из первого потока, который произошел после последнего события из второго потока.

В начале это выглядит следующим образом:

--a-> 
(?) 
----> 
(=) 
--a-> 

После второго потока излучает первое событие это выглядит следующим образом:

--a---b-c-d-> 
    (?) 
-----1------> 
    (=) 
------b-c-d-> 

После нового события во втором потоке:

--a---b-c-d---e--f-> 
     (?) 
-----1-------2-----> 
     (=) 
--------------e--f-> 

И так далее ... Какому набору операторов необходимо было это сделать?

+1

Это отличный вопрос! Я собираюсь взглянуть на это [RxJS 5 Operators By Example: combinationLatest] (https://gist.github.com/btroncone/d6cf141d6f2c00dc6b35#combinelatest), чтобы узнать, могу ли я что-нибудь придумать. – wolfhoundjesse

ответ

2

Вы можете использовать CombineLatest вернуть события, как вы хотели бы, например:

/* Have staggering intervals */ 
var source1 = Rx.Observable.interval(1000) 
    .map(function(i) { 

     return { 
      data: (i + 1), 

      time: new Date() 
     }; 
    }); 

var source2 = Rx.Observable.interval(3000).startWith(-1) 
    .map(function(i) { 

     return { 
      data: (i + 1), 
      time: new Date() 
     }; 
    }); 

// Combine latest of source1 and source2 whenever either gives a value with a selector 
var source = Rx.Observable.combineLatest(
    source1, 
    source2, 
    function(s1, s2) { 

     if (s1.time > s2.time) { 
      return s1.data + ', ' + s2.data; 
     } 

     return undefined; 
    } 
).filter(x => x !== undefined).take(10); 

var subscription = source.subscribe(
    function(x) { 
     console.log('Next: %s', x); 
    }, 
    function(err) { 
     console.log('Error: %s', err); 
    }, 
    function() { 
     console.log('Completed'); 
    }); 
0

Я не 100% на ваших мраморов, так как они, кажется несколько противоречивым, но я хотел бы предложить альтернативу, которая использовать window или buffer, если вам нужно всего лишь объединить события, которые произошли после события из второго источника.

var count = 0; 
//Converts the source into an Observable of Observable windows 
source1.window(source2).subscribe(w => { 
    var local = count++; 
    //Just to show that this is indeed an Observable 
    //Subscribe to the inner Observable 
    w.subscribe(x => console.log(local + ': ' + x)); 
}); 

ПримечаниеЭтот излучает только значения из первого источника, если вы хотите, чтобы они в сочетании со значениями другого источника в то все средства использовать combineLatest или withLatestFrom как предложено другие ответы.