В настоящее время я создаю приложение узла, которое использует hasoop как долговременное хранилище данных, когда моя служба не работает. Из-за ожидаемого количества передач, которые, как ожидается, будут происходить, и наименьшее время обработки предпочтительнее, данные не записываются на диски, а вместо этого напрямую передаются на то, что я намереваюсь сделать с ним.NodeJS pipeing from tar packing to webhdfs error
Я получаю следующее сообщение об ошибке:
\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588
src.unpipe(req);
^
TypeError: src.unpipe is not a function
at Request.onPipe (\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588:9)
at emitOne (events.js:101:20)
at Request.emit (events.js:188:7)
at Pack.Stream.pipe (stream.js:103:8)
at Object.hadoop.putServer (\nodejs_host\hadoop.js:37:29)
at Object.<anonymous> (\nodejs_host\hadoop.js:39:8)
at Module._compile (module.js:541:32)
at Object.Module._extensions..js (module.js:550:10)
at Module.load (module.js:458:32)
at tryModuleLoad (module.js:417:12)
Я основывали свой код на следующей документации:
https://github.com/npm/node-tar/blob/master/examples/packer.js https://github.com/harrisiirak/webhdfs/blob/master/README.md (пишущий на удаленный файл)
Это код, который я написали:
var webhdfs = require('webhdfs');
var fs = require('fs');
var tar = require('tar');
var fstream = require('fstream');
var hdfs = webhdfs.createClient({
path: '/webhdfs/v1',
// private
});
var hadoop = {}
hadoop.putServer = function(userid, svcid, serverDirectory, callback){
var readStream = fstream.Reader({path: serverDirectory, type: 'Directory'})
var writeStream = hdfs.createWriteStream('/services/' + userid + '/' + svcid + '.tar')
var packer = tar.Pack({noProprietary: true})
packer.on('error', function(){console.error(err), callback(err, false)})
readStream.on('error', function(){console.error(err), callback(err, false)})
writeStream.on('error', function(){console.error(err), callback(err, false)})
writeStream.on('finish', function(){callback(null, true)})
readStream.pipe(packer).pipe(writeStream);
}
hadoop.putServer('1', '1', 'C:/test', function(){console.log('hadoop.putServer test done')});
Документация предполагает, что это должно работать, может ли кто-нибудь быть любезным, чтобы сказать мне, что я сделал не так?
Если бы глупца в Lib \ webhdfs: 588 here
req.on('pipe', function onPipe (src) {
// Pause read stream
stream = src;
stream.pause();
// This is not an elegant solution but here we go
// Basically we don't allow pipe() method to resume reading input
// and set internal _readableState.flowing to false
canResume = false;
stream.on('resume', function() {
if (!canResume) {
stream._readableState.flowing = false;
}
});
// Unpipe initial request
src.unpipe(req); // <-- Line 588
req.end();
});