2017-02-04 40 views
2

Я пытаюсь понять, как работает нативная реализация потоков. Вот код:Node.JS - нужна помощь для понимания родных потоков

const Stream = require('stream'); 
 

 
// define a custom class to read my data into the stream 
 
class SourceWrapper extends Stream.Readable { 
 
    constructor(opt, content) { 
 
    super(opt); 
 
    this.content = content; 
 
    this.len = content.length; 
 
    this.index = 0; 
 
    } 
 

 
    _read() { 
 
    let i = this.index++; 
 
    if (i >= this.len) 
 
     this.push(null); 
 
    else { 
 
     this.push(this.content[i]); 
 
    } 
 
    } 
 
} 
 

 
// generate some data 
 
const arr = (new Array(10000000)).fill(1); 
 

 
// declare the streams 
 
const firstStream = new SourceWrapper({objectMode: true}, arr); 
 

 
const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); 
 

 
const firstMapStream = new Stream.Transform({objectMode: true}); 
 
firstMapStream._transform = transform; 
 
const secondMapStream = new Stream.Transform({objectMode: true}); 
 
secondMapStream._transform = transform; 
 

 
// create a promise to measure execution time 
 
const start = new Date(); 
 

 
new Promise((resolve, reject) => { 
 
    firstStream 
 
    .pipe(firstMapStream) 
 
    .pipe(secondMapStream) 
 
    .on('finish',() => resolve(new Date())); 
 
}) 
 
.then((end) => console.log('execTime', end - start));

Проблема заключается в том, что она работает на небольших наборах данных (т.е. [1,2,3,4]), но, кажется, прекращается вскоре после запуска на большом наборе.

Что мне не хватает? Это как-то связано с objectMode?

Цените любую помощь.

ответ

1

Причина в том, что кто-то должен читать данные из потоков со связыванием data прослушиватель событий. Я переписал ваш код, чтобы понять, как это понять. Также я исправил неверный подсчет индекса, который пропустил нулевой индекс.

'use strict'; 
const Stream = require('stream'); 

// define a custom class to read my data into the stream 
class SourceWrapper extends Stream.Readable { 
    constructor(opt, content) { 
    super(opt); 
    this.content = content; 
    this.len = content.length; 
    this.index = 0; 
    } 

    _read() { 
    let i = this.index; 
    if (i >= this.len) { 
     this.push(null); 
    } else { 
     this.push(this.content[i]); 
    } 
    this.index++; 
    } 
} 


const transform = (x, enc, next) => next(undefined, x * Math.random(x, 10)); 

const transform1 = new Stream.Transform({objectMode: true}); 
transform1._transform = transform; 

const transform2 = new Stream.Transform({objectMode: true}); 
transform2._transform = transform; 


const write = new Stream.Writable({ 
    objectMode: true, 
    write(value, enc, next) { 
     // Do something like writing... 
     next(); 
    } 
}); 


// generate some data 
const arr = (new Array(1000000)).fill(1); 
const read = new SourceWrapper({objectMode: true}, arr); 

new Promise((resolve, reject) => { 
    read 
    .pipe(transform1) 
    .pipe(transform2) 
    .pipe(write) 
    .on('finish',() => { 
     resolve(); 
    }); 
}) 
.then(() => { 
    console.log('Done'); 
}); 
+0

Спасибо. Думаю, теперь я понимаю. – nainy