2016-01-20 3 views
0

Мне нужно заполнить PG файлом tsv ~ 1,5 G. Я планирую использовать потоковое и pg-copy-stream, и он работал для прямой копии. Затем мне нужно сделать некоторую трансформацию и добавить сквозной канал, и это не удалось. Я предполагаю, что это, вероятно, проблема с буфером, и кто-то должен был это сделать.Поток из файла, трансформации и ошибки postgres

Происхождение tsvfile.txt имеет формат

V1\tV2\tV3\tV4\n 
V2\tV2\tV3\tV4\n 

Код

var fs = require('fs'), pg = require('pg'), es = require('es'), pgs = require('pg-copy-stream'); 
var filename = 'tsvfile.txt'; 
var pgkey = 'somepgkey'; 
pg.connect(pgkey, function(err, client, done){ 
    var query = client.query(pgs.from('COPY table1 (C1, C2, C3, C4) FROM STDIN')); 
    var fstream = fs.createReadStream(filename); 
    fstream.pipe(es.split()) 
      .pipe(es.mapSync(function(line){ 
       var midline = line.split('\t').map(sometransform()).join('\t'); 
       return midline + '\n'; 
       //not sure \n is necessary here 
      }).pipe(query) 
      .on('end', done) 
      .on('err', somethingelse) 
}) 

ошибка я получил error: extra data after last expected column, но работает отлично, если я удалить первые две трубы.

+0

Update: если удалить первые 2 трубы, он отлично работает. Но когда я регистрирую прослушиватель для запроса (который является потоком) с помощью 'query.on ('data', callback), он снова заполняет весь процесс. pg отключит связь и данные не будут сохранены. Это может быть что-то с модулем pg-copy-stream. –

ответ

0

Первое, что я попробую, это удалить + '\ n' - возможно, это испортило новую строку. Если это не поможет. Первым шагом было бы сделать функцию sometransform(), которая ничего не изменит. Если это работает без ошибок, ваша проблема в функции sometranform() (вы, например, добавить \ т внутри функции?)

+0

Я удалил весь раздел преобразования и оставил только es.split(), но он все еще не работает. Удалите оба преобразования, и es.split() решит проблему. Я подозреваю, что делаю трансформацию, я испортил буфер в потоке, но не знаю, как исследовать и/или исправлять его. –

0

line.split('\t').map(sometransform()).join('\t')

someTransform ли действительно вернуть функцию? который используется для преобразования данных?

Если ответ «нет», или «что?», Попробуйте это: line.split('\t').map(sometransform).join('\t')