2013-07-04 7 views
17

Я хотел бы объединить два потока Node.js в один, если возможно, путем их конвейера. Я использую потоки Transform.Создание потока Node.js из двух потоков с потоками

Другими словами, я хотел бы, чтобы моя библиотека вернула myStream людям, которым нужно будет пользоваться. Например, они могли бы написать:

process.stdin.pipe(myStream).pipe(process.stdout); 

И внутренне я использую сторонний vendorStream что делает некоторую работу, подключил в свою собственную логику, содержащуюся в myInternalStream. Итак, что выше, перевести на:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout); 

Могу ли я сделать что-то подобное? Я пробовал var myStream = vendorStream.pipe(myInternalStream), но это явно не работает.

Чтобы провести аналогию с bash, скажем, я хочу, чтобы написать программу, которая проверяет, является ли буква h присутствует в последней строке некоторого потока (tail -n 1 | grep h), я могу создать скрипт:

# myscript.sh 
tail -n 1 | grep h 

И тогда, если люди делают:

$ printf "abc\ndef\nghi" | . myscript.sh 

Он просто работает.

Это то, что я до сих пор:

// Combine a pipe of two streams into one stream 

var util = require('util') 
    , Transform = require('stream').Transform; 

var chunks1 = []; 
var stream1 = new Transform(); 
var soFar = ''; 
stream1._transform = function(chunk, encoding, done) { 
    chunks1.push(chunk.toString()); 
    var pieces = (soFar + chunk).split('\n'); 
    soFar = pieces.pop(); 
    for (var i = 0; i < pieces.length; i++) { 
    var piece = pieces[i]; 
    this.push(piece); 
    } 
    return done(); 
}; 

var chunks2 = []; 
var count = 0; 
var stream2 = new Transform(); 
stream2._transform = function(chunk, encoding, done) { 
    chunks2.push(chunk.toString()); 
    count = count + 1; 
    this.push(count + ' ' + chunk.toString() + '\n'); 
    done(); 
}; 

var stdin = process.stdin; 
var stdout = process.stdout; 

process.on('exit', function() { 
    console.error('chunks1: ' + JSON.stringify(chunks1)); 
    console.error('chunks2: ' + JSON.stringify(chunks2)); 
}); 
process.stdout.on('error', process.exit); 


// stdin.pipe(stream1).pipe(stream2).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

// Best working solution I could find 
var stream3 = function(src) { 
    return src.pipe(stream1).pipe(stream2); 
}; 
stream3(stdin).pipe(stdout); 

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js 
// Outputs: 
// 1 abc 
// 2 def 
// 3 ghi 
// chunks1: ["abc\nd","ef\nghi\n"] 
// chunks2: ["abc","def","ghi"] 

Является ли это вообще возможно? Дайте мне знать, не ясно ли, что я пытаюсь сделать.

Спасибо!

ответ

25

Вы можете следить за то, чтобы быть передан в ваш поток, а затем unpipe оно и конвейер к потокам вы заинтересованы в:

var PassThrough = require('stream').PassThrough; 

var stream3 = new PassThrough(); 

// When a source stream is piped to us, undo that pipe, and save 
// off the source stream piped into our internally managed streams. 
stream3.on('pipe', function(source) { 
    source.unpipe(this); 
    this.transformStream = source.pipe(stream1).pipe(stream2); 
}); 

// When we're piped to another stream, instead pipe our internal 
// transform stream to that destination. 
stream3.pipe = function(destination, options) { 
    return this.transformStream.pipe(destination, options); 
}; 

stdin.pipe(stream3).pipe(stdout); 

Вы можете извлечь эту функцию в свой собственный constructable класса потока :

var util = require('util'); 
var PassThrough = require('stream').PassThrough; 

var StreamCombiner = function() { 
    this.streams = Array.prototype.slice.apply(arguments); 

    this.on('pipe', function(source) { 
    source.unpipe(this); 
    for(i in this.streams) { 
     source = source.pipe(this.streams[i]); 
    } 
    this.transformStream = source; 
    }); 
}; 

util.inherits(StreamCombiner, PassThrough); 

StreamCombiner.prototype.pipe = function(dest, options) { 
    return this.transformStream.pipe(dest, options); 
}; 

var stream3 = new StreamCombiner(stream1, stream2); 
stdin.pipe(stream3).pipe(stdout); 
+0

Спасибо большое @brandon, это потрясающе! Обновлен мой Gist https://gist.github.com/nicolashery/5910969 –

+0

Это потрясающе. Я думал о том, чтобы делать что-то подобное, но у меня просто не было уверенности, что я не пропустил какую-то тонкость, которая сделала бы мое решение неправильным. Спасибо за доверие – FellowMD

+0

FWIW, чтобы это решение работало, вам необходимо передать stream3 в исходный код (в этом случае stdin), прежде чем передавать его на стандартный вывод. Итак, no stream3.pipe (stdout); stream3.write (данные); Но это отличная помощь! Спасибо! –

2

Одним из вариантов является, возможно, использовать multipipe, который позволяет объединять несколько преобразований вместе, завернуть в виде одного преобразования потока:

// my-stream.js 
var multipipe = require('multipipe'); 

module.exports = function createMyStream() { 
    return multipipe(vendorStream, myInternalStream); 
}; 

Тогда вы можете сделать:

var createMyStream = require('./my-stream'); 

var myStream = createMyStream(); 

process.stdin.pipe(myStream).pipe(process.stdout); 

 Смежные вопросы

  • Нет связанных вопросов^_^