Я изо всех сил пытаюсь преобразовать потоки узлов в Rxjs Observables.Преобразование потоков узлов в Rx.js Наблюдаемые
Поток сам по себе отлично работает, когда я пытаюсь использовать 1 URL. Но когда я пытаюсь сопоставить одну и ту же функцию по массиву URLS, я получаю ошибки.
Я использую Rx.Node для преобразования потока в наблюдаемый.
Это то, что я в настоящее время пытается
// data_array is an array of 10 urls that I'm scraping data from.
let parentStream = Rx.Observable.from(data_array);
parentStream.map(createStream).subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function createStream(url){
return RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
}
Но это выход X 10 (число URLS в data_array)
RefCountObservable {
source:
ConnectableObservable {
source: AnonymousObservable { source: undefined, __subscribe: [Function] },
_connection: null,
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [],
hasError: false } },
_count: 0,
_connectableSubscription: null }
Я сначала подумал flatMap бы потому что он сглаживает наблюдаемые в наблюдаемом .... но когда я пытаюсь flatMap, я получаю это:
Complete
Error TypeError: unknown type returned
Однако, если я делаю это:
Это работает для 1 URL, но я не могу охватить все URLs в data_array в одном потоке.
let stream = RxNode.fromStream(x(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]).write().pipe(JSONStream.parse('*')))
stream.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'))
Я чувствую, что я что-то недопонимание не только потому, что очистка не работает для нескольких URLS, но даже тогда, когда он работает во втором примере .... я получаю «Complete», прежде чем все данные поступают.
Очевидно, я что-то не понимаю. Любая помощь будет замечательной. Благодарю.
* UPDATE *
Я попробовал другой путь, который работает, но не использует Node Stream. Потоки узлов были бы идеальными, поэтому все же хотелось бы, чтобы приведенный выше пример работал.
Подход, который я использовал, состоял в том, чтобы обернуть обещание вокруг моей функции соскабливания в сети, то есть scrape ниже. Это работает, но в результате получается десять огромных массивов со всеми данными каждого URL-адреса в каждом массиве. То, что я действительно хочу, это поток объектов, который я могу составить ряд преобразований по мере прохождения объектов данных.
Здесь разные, но работая подход:
let parentStream = Rx.Observable.from(data_array);
parentStream.map(url => {
return Rx.Observable.defer(() => {
return scrape(url, '#centercol ul li', [{name: 'a', link: '[email protected]'}]);
})
})
.concatAll()
.subscribe(x => console.log(x), (e)=> console.log('Error', e), console.log('Complete'));
function scrape(url, selector, scope) {
return new Promise(
(resolve, reject) => x(
url,
selector,
scope
)((error, result) => error != null ? reject(error) : resolve(result))
);
}