2016-09-11 23 views
1

В настоящее время я создаю приложение узла, которое использует hasoop как долговременное хранилище данных, когда моя служба не работает. Из-за ожидаемого количества передач, которые, как ожидается, будут происходить, и наименьшее время обработки предпочтительнее, данные не записываются на диски, а вместо этого напрямую передаются на то, что я намереваюсь сделать с ним.NodeJS pipeing from tar packing to webhdfs error

Я получаю следующее сообщение об ошибке:

\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588 
    src.unpipe(req); 
     ^

TypeError: src.unpipe is not a function 
    at Request.onPipe (\nodejs_host\node_modules\webhdfs\lib\webhdfs.js:588:9) 
    at emitOne (events.js:101:20) 
    at Request.emit (events.js:188:7) 
    at Pack.Stream.pipe (stream.js:103:8) 
    at Object.hadoop.putServer (\nodejs_host\hadoop.js:37:29) 
    at Object.<anonymous> (\nodejs_host\hadoop.js:39:8) 
    at Module._compile (module.js:541:32) 
    at Object.Module._extensions..js (module.js:550:10) 
    at Module.load (module.js:458:32) 
    at tryModuleLoad (module.js:417:12) 

Я основывали свой код на следующей документации:

https://github.com/npm/node-tar/blob/master/examples/packer.js https://github.com/harrisiirak/webhdfs/blob/master/README.md (пишущий на удаленный файл)

Это код, который я написали:

var webhdfs = require('webhdfs'); 
var fs = require('fs'); 
var tar = require('tar'); 
var fstream = require('fstream'); 

var hdfs = webhdfs.createClient({ 
    path: '/webhdfs/v1', 
    // private 
}); 

var hadoop = {} 

hadoop.putServer = function(userid, svcid, serverDirectory, callback){ 
    var readStream = fstream.Reader({path: serverDirectory, type: 'Directory'}) 
    var writeStream = hdfs.createWriteStream('/services/' + userid + '/' + svcid + '.tar') 
    var packer = tar.Pack({noProprietary: true}) 

    packer.on('error', function(){console.error(err), callback(err, false)}) 
    readStream.on('error', function(){console.error(err), callback(err, false)}) 
    writeStream.on('error', function(){console.error(err), callback(err, false)}) 
    writeStream.on('finish', function(){callback(null, true)}) 

    readStream.pipe(packer).pipe(writeStream); 
} 
hadoop.putServer('1', '1', 'C:/test', function(){console.log('hadoop.putServer test done')}); 

Документация предполагает, что это должно работать, может ли кто-нибудь быть любезным, чтобы сказать мне, что я сделал не так?

Если бы глупца в Lib \ webhdfs: 588 here

req.on('pipe', function onPipe (src) { 
// Pause read stream 
stream = src; 
stream.pause(); 

// This is not an elegant solution but here we go 
// Basically we don't allow pipe() method to resume reading input 
// and set internal _readableState.flowing to false 
canResume = false; 
stream.on('resume', function() { 
    if (!canResume) { 
    stream._readableState.flowing = false; 
    } 
}); 

// Unpipe initial request 
src.unpipe(req); // <-- Line 588 
req.end(); 
}); 

ответ

0

Ладно, так что я смотрю вокруг по вопросам, на GitHub страницах этих модулей и нашел кого-то, упоминая отказ от пакета дегтя для тар-фс. Дал ему выстрел и мгновенно работает :)

Так что, если у кого-то связанный вопрос с webhdfs & деготь, взглянуть на тар-фс https://github.com/mafintosh/tar-fs

 Смежные вопросы

  • Нет связанных вопросов^_^