2016-09-29 1 views
1

Я изо всех сил пытаюсь преобразовать потоки узлов в Rxjs Observables.Преобразование потоков узлов в Rx.js Наблюдаемые

Поток сам по себе отлично работает, когда я пытаюсь использовать 1 URL. Но когда я пытаюсь сопоставить одну и ту же функцию по массиву URLS, я получаю ошибки.

Я использую Rx.Node для преобразования потока в наблюдаемый.

Это то, что я в настоящее время пытается

// data_array is an array of 10 urls that I'm scraping data from. 
let parentStream = Rx.Observable.from(data_array); 

parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createStream(url){ 
    return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 
} 

Но это выход X 10 (число URLS в data_array)

RefCountObservable { 
source: 
    ConnectableObservable { 
    source: AnonymousObservable { source: undefined, __subscribe: [Function] }, 
_connection: null, 
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] }, 
_subject: 
    Subject { 
    isDisposed: false, 
    isStopped: false, 
    observers: [], 
    hasError: false } }, 
_count: 0, 
_connectableSubscription: null } 

Я сначала подумал flatMap бы потому что он сглаживает наблюдаемые в наблюдаемом .... но когда я пытаюсь flatMap, я получаю это:

Complete 
Error TypeError: unknown type returned 

Однако, если я делаю это:

Это работает для 1 URL, но я не могу охватить все URLs в data_array в одном потоке.

let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*'))) 

stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')) 

Я чувствую, что я что-то недопонимание не только потому, что очистка не работает для нескольких URLS, но даже тогда, когда он работает во втором примере .... я получаю «Complete», прежде чем все данные поступают.

Очевидно, я что-то не понимаю. Любая помощь будет замечательной. Благодарю.

* UPDATE *

Я попробовал другой путь, который работает, но не использует Node Stream. Потоки узлов были бы идеальными, поэтому все же хотелось бы, чтобы приведенный выше пример работал.

Подход, который я использовал, состоял в том, чтобы обернуть обещание вокруг моей функции соскабливания в сети, то есть scrape ниже. Это работает, но в результате получается десять огромных массивов со всеми данными каждого URL-адреса в каждом массиве. То, что я действительно хочу, это поток объектов, который я могу составить ряд преобразований по мере прохождения объектов данных.

Здесь разные, но работая подход:

let parentStream = Rx.Observable.from(data_array); 

parentStream.map(url => { 
    return Rx.Observable.defer(() => { 
     return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]); 
    }) 
}) 
    .concatAll() 
    .subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function scrape(url, selector, scope) { 
    return new Promise(
     (resolve, reject) => x(
      url, 
      selector, 
      scope 
     )((error, result) => error != null ? reject(error) : resolve(result)) 
    ); 
} 

ответ

1

* Решение * я понял это. Я приложил решение ниже:

Вместо этого, используя RxNode, я решил использовать Rx.Observable.fromEvent().

Узел потоков испускает события, будь то новые данные, ошибка или полная.

Так что fromEvent Статический оператор слушает событие «данные» и создает новое Наблюдаемое для каждого события.

Затем я объединил все эти и подписался.Вот код:

let parentStream = Rx.Observable.from(data_array); 
parentStream.map((url)=> { return createEventStream(url); }).mergeAll().subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete')); 

function createEventStream(url){ 
    return Rx.Observable.fromEvent(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')), 'data'); 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^