2017-01-21 11 views
1

Я использую csv-to-json, аккуратную библиотеку для обработки файлов CSV.Приостановка читаемого потока в Node.js

У меня есть прецедент, где мне нужно обработать большой (> 2 миллиона строк) CSV и вставить его в БД.

Чтобы сделать это без проблем с памятью, я намерен обрабатывать CSV как поток, приостанавливая поток каждые 10000 строк, вставляя строки в мой БД и затем возобновляя поток.

По какой-то причине я не могу представить pause ручей.

Возьмем, например, следующий код:

const rs = fs.createReadStream("./foo.csv"); 
rs.pause(); 

let count = 0; 

csv() 
.fromStream(rs) 
.on("json", (json) => { 
    count++; 
    console.log(count); 
}) 
.on("done",() => { 
    cb(null, count); 
}) 
.on("error", (err) => { 
    cb(err); 
}) 

count регистрируется в 200 раз (это сколько строк у меня в CSV) - Я ожидал, что это не что-нибудь войти, так как поток остановился перед передачей оно до fromStream()

+0

вы делаете одну строку в момент вставки в базе данных? почему вы не создаете очередь и не ограничиваете одновременное выполнение запросов или используете какой-либо метод async для предотвращения утечек памяти и предотвращения запросов на очистку? –

+1

@AsifSaeed Мне не интересно ни что иное, кроме приостановки потока или информации о том, выполнимо это или нет. Спасибо в любом случае. –

ответ

1

Вот решение, предложенное создателем библиотеки, отслеживаются в этом Issue:

var tmpArr=[]; 
rs.pipe(csv({},{objectMode:true})).pipe(new Writable({ 
    write: function(json, encoding,callback){ 
    tmpArr.push(json); 
    if (tmpArr.length===10000){ 
     myDb.save(tmpArr,function(){ 
     tmpArr=[]; 
     callback(); 
     }) 
    }else{ 
     callback(); 
    } 
    } , 
    objectMode:true 
})) 
.on('finish',function(){ 
    if (tmpArr.length>0){ 
    myDb.save(tmpArr,function(){ 
     tmpArr=[]; 
    }) 
    } 
}) 

Я на самом деле удалось эмулировать делая паузу, создавая таким образом, но это не идеально:

let count = 0; 
var csvParser=csv() 
.fromStream(rs) 
.on("json", (json) => { 
    rows.push(json); 
    if (rows.length % 1000 === 0) { 
    rs.unpipe(); 
    // clear `rows` right after `unpipe` 
    const entries = rows; 
    rows = []; 
    this._insertEntries(db, entries,()=> { 
     rs.pipe(csvParser); 
    }); 
    } 
}) 
+0

Использование записываемого потока - хорошая идея, чтобы иметь возможность приостанавливать его и делать такие вещи, как обновления баз данных между , Спасибо, что поделился! – Johnny

1

Вы не можете сделать это, если вы не измените библиотеку csv2json.

Это ссылка, которую вы должны прочитать первый
https://nodejs.org/dist/latest-v6.x/docs/api/stream.html#stream_three_states

поток находится в режиме паузы, когда вы сделали rs.pause(). Infact, даже если вы этого не сделаете, читаемый поток начинается в режиме паузы.

Поток переходит в resume под 3 сценариями.

  • Либо есть прослушиватель .on('data') события или
  • есть .pipe() метод прилагаются или
  • readable.resume() называется явным.

В вашем случае метод fromStream() имеет pipe метод, прикрепленный к вашему читаемый поток, который, таким образом, возобновленного поток.

Ссылка Код:
https://github.com/Keyang/node-csvtojson/blob/master/libs/core/Converter.js#L378

Converter.prototype.fromStream=function(readStream,cb){ 
    if (cb && typeof cb ==="function"){ 
    this.wrapCallback(cb); 
    } 
    process.nextTick(function(){ 
    readStream.pipe(this); 
    }.bind(this)) 
    return this; 
} 
0

Я воспользовался тем, что у csvtojson также есть метод fromString(...) и использовал его как показано ниже.

  1. Используйте пакет line-by-line для чтения фиксированного количества строк, то есть 10000, и сохраните их в массиве.
  2. пауза линейный считыватель с использованием lr.pause().
  3. вставки заголовков строки (если ваш файл CSV имеет строку заголовка, то с помощью простого условного оператора игнорировать первую строку, возвращаемую строку за строкой чтения) с индексом 0.
  4. соединить все линии с EOL характером, который даст вам строковое представление 10000 строк этого CSV-файла.
  5. использовать csvtojson's .fromString(...), чтобы преобразовать строковое представление блока в json-объекты и вставить их в db.
  6. возобновите поток через lr.resume() и повторите пока линейный считыватель не испустит 'end' событие.

Вот полный код

const CSVToJSON = require("csvtojson"); 
const LineByLineReader = require("line-by-line"); 
const { EOL } = require("os"); 

const BLOCK_LIMIT = 10000; 

let lines = []; 
let isFirstLineProcessed = false; 

const lr = new LineByLineReader("./foo.csv"); 

lr 
.on("line", (line) => { 

    // remove this if statement if your CSV does not contain headers line 
    if (!isFirstLineProcessed) { 
     isFirstLineProcessed = true; 
     return; 
    } 

    lines.push(line); 

    if (lines.length === BLOCK_LIMIT) { 
     lr.pause(); 

     // insert headers string ("field1, field2, ...") at index 0; 
     lines.splice(0, 0, headers); 

     // join all lines using newline operator ("\n") to form a valid csv string 
     const csvBlockString = lines.join(EOL); 
     const entries = []; 

     lines = [];  

     csv() 
      .fromString(csvBlockString) 
      .on("json", (json) => { 
       entries.push(json); 
      }) 
      .on("done",() => { 
       this._insertEntries(db, entries,()=> { 
        lr.resume(); 
       }); 
      }); 
    } 
}) 
.on("end",() => { 
    console.log("done"); 
});