2016-11-11 4 views
0

Я пытаюсь запустить кластер, который будет передавать потоки файлов (новая строка с разделителями JSON) из облачного хранилища Google и преобразовывать каждую строку после извлечения данных из MongoDB. После преобразования строки я хочу сохранить ее в bigquery Google - 10000 строк за раз. Все это работает нормально, но проблема в том, что скорость обработки потоковых файлов со временем значительно уменьшается.Node.js Считываемый поток замедляется со временем, загрузка процессора падает

Я установил приложение узла на одном сервере и mongodb на другое. Оба 8-ядерных машин с 30 ГБ оперативной памяти. Когда сценарий выполняется, изначально использование ЦП для сервера приложений и сервера mongodb составляет около 70% -75%. Через 30 минут загрузка процессора падает до 10%, а затем, наконец, 1%. Приложение не генерирует исключений. Я вижу журнал приложений и обнаружил, что он закончил обработку нескольких файлов и занялся новыми файлами для обработки. Одно исполнение может наблюдаться ниже чуть позже 3:00 вечера и почти до 5:20 вечера.

CPU Usage Disk Usage

var cluster = require('cluster'), 
    os = require('os'), 
    numCPUs = os.cpus().length, 
    async = require('async'), 
    fs = require('fs'), 
    google = require('googleapis'), 
    bigqueryV2 = google.bigquery('v2'), 
    gcs = require('@google-cloud/storage')({ 
     projectId: 'someproject', 
     keyFilename: __dirname + '/auth.json' 
    }), 
    dataset = bigquery.dataset('somedataset'), 
    bucket = gcs.bucket('somebucket.appspot.com'), 
    JSONStream = require('JSONStream'), 
    Transform = require('stream').Transform, 
    MongoClient = require('mongodb').MongoClient, 
    mongoUrl = 'mongodb://localhost:27017/bigquery', 
    mDb, 
    groupA, 
    groupB; 

var rows = [], 
    rowsLen = 0; 

function transformer() { 

    var t = new Transform({ objectMode: true }); 

    t._transform = function(row, encoding, cb) { 
     // Get some information from mongodb and attach it to the row 
     if (row) { 
      groupA.findOne({ 
       'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
      }, { 
       fields: { 'properties.OA_SA': 1, _id: 0 } 
      }, function(err, a) { 
       if (err) return cb(); 
       groupB.findOne({ 
        'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
       }, { 
        fields: { 'properties.WZ11CD': 1, _id: 0 } 
       }, function(err, b) { 
        if (err) return cb(); 
        row.groupA = a ? a.properties.OA_SA : null; 
        row.groupB = b ? b.properties.WZ11CD : null; 

        // cache processed rows in memory 
        rows[rowsLen++] = { json: row }; 

        if (rowsLen >= 10000) { 
         // batch insert rows in bigquery table 
         // and free memory 
         log('inserting 10000') 
         insertRowsAsStream(rows.splice(0, 10000)); 
         rowsLen = rows.length; 
        } 

        cb(); 
       }); 
      }); 
     } else { 
      cb(); 
     } 
    }; 

    return t; 
} 

var log = function(str) { 
    console.log(str); 
} 

function insertRowsAsStream(rows, callback) { 
    bigqueryV2.tabledata.insertAll({ 
     "projectId": 'someproject', 
     "datasetId": 'somedataset', 
     "tableId": 'sometable', 
     "resource": { 
      "kind": "bigquery#tableDataInsertAllRequest", 
      "rows": rows 
     } 
    }, function(err, res) { 
     if (res && res.insertErrors && res.insertErrors.length) { 
      console.log(res.insertErrors[0].errors) 
      err = err || new Error(JSON.stringify(res.insertErrors)); 
     } 
    }); 
} 


function startStream(fileName, cb) { 
    // stream a file from Google cloud storage 
    var file = bucket.file(fileName), 
     called = false; 

    log(`Processing file ${fileName}`); 

    file.createReadStream() 
     .on('data', noop) 
     .on('end', function() { 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }) 
     .pipe(JSONStream.parse()) 
     .pipe(transformer()) 
     .on('finish', function() { 
      log('transformation ended'); 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }); 
} 

function processFiles(files, cpuIdentifier) { 
    if (files.length == 0) return; 
    var fn = []; 

    for (var i = 0; i < files.length; i++) { 
     fn.push(function(cb) { 
      startStream(files.pop(), cb); 
     }); 
    } 

    // process 3 files in parallel 
    async.parallelLimit(fn, 3, function() { 
     log(`child process ${cpuIdentifier} completed the task`); 
     fs.appendFile(__dirname + '/complete_count.txt', '1'); 
    }); 
} 

if (cluster.isMaster) { 
    for (var ii = 0; ii < numCPUs; ii++) { 
     cluster.fork(); 
    } 
} else { 
    MongoClient.connect(mongoUrl, function(err, db) { 
     if (err) throw (err); 
     mDb = db; 
     groupA = mDb.collection('groupageo'); 
     groupB = mDb.collection('groupbgeo'); 
     processFiles(files, process.pid); 
     // `files` is an array of file names 
     // each file is in newline json delimited format 
     // ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"] 
    }); 
} 
+0

Как было использование ОЗУ и жесткого диска? –

+0

Я предполагаю, что оперативная память была в порядке, потому что я не получил ошибку распределения памяти/GC. Почему HD будет проблемой в этом решении? –

+0

Не получать ошибки распределения памяти не означает, что с этим нет проблем. Чрезмерное использование ОЗУ вызовет использование памяти подкачки, что, в свою очередь, будет использовать жесткий диск. –

ответ

0

Хорошо, я нашел виновника! В клиентской библиотеке API Google API Node.js используется модуль, называемый «потоковыми событиями», который реализует Streams 0.8. Потоки 0.8 не контролируют скорость, с которой он испускает событие «данные», исходя из способности потребителя потреблять данные. Функция контроля скорости была введена в Streams 1.0. Таким образом, это означало, что читаемый поток метал данные в MongoDB со скоростью, которую он не смог обработать.

Я использовал модуль «запрос» вместо клиентской библиотеки Google. Я подал подписанный URL-адрес в модуль запроса, который, в свою очередь, получал результаты как поток, который я мог бы транслировать в свой трансформатор.

Убежать: Всегда проверяйте модули, которые вы используете для версий потоков, которые они используют.