2016-07-16 8 views
-1

Я разрабатываю приложение C++ (в VS2012, Windows Server 2012 R2), которое записывает большие объемы двоичных данных из циклических массивов буферов, которые были выделены, в сырые файлы. Дело в том, что использование системной ОЗУ, сообщаемое диспетчером задач Windows, увеличивается с линейной скоростью, так как fwrite записывает данные в файлы до тех пор, пока не достигнет определенной точки, где она остается почти постоянной (см. Также следующее изображение). Кроме того, память, используемая моим приложением, остается постоянной все время.Почему диспетчер задач Windows показывает увеличение памяти при записи очень больших файлов? Я должен беспокоиться?

resource monitor - memory

Я называю fflush периодически, и это не имеет никакого эффекта. Хотя это кажется безобидным, я обеспокоен этой проблемой с точки зрения производительности, поскольку другое приложение Java также будет работать в номинальной работе.

Поэтому я хотел бы спросить, не стоит ли мне беспокоиться об этом, и если есть способ избежать этой проблемы для достижения наилучшей производительности для системы записи данных в режиме реального времени.

Аналогичные вопросы были заданы here и here для операционных систем linux, и было сказано, что система может выделять объем памяти для кэширования данных, при условии наличия достаточного объема памяти.

Далее представлена ​​часть заявки. Короче говоря, приложение управляет двумя камерами, и каждый из них получает фреймы и сохраняет их в правильно выровненных выделенных буферах. Есть i) класс CameraInterface, который создает два «продюсерских» потока, ii) класс Recorder, который создает два «потребительских» потока и iii) класс SharedMemoryManager, который предоставляет производителю доступный буфер для хранения данных и пользователя с следующий буфер, который будет записан в файл. SharedMemoryManager содержит два массива буферов (по одному для каждой пары производителей-потребителей) и два соответствующих массива флагов, которые указывают состояние буфера. Он также содержит два объекта std::queue для быстрого доступа к следующим буферам для записи. Далее показаны части рекордера и SharedMemoryManager.

// somewhere in file "atcore.h"... 
typedef unsigned char AT_U8; 

// File: SharedMemoryManager.h 
#ifndef __MEM_MANAGER__ 
#define __MEM_MANAGER__ 

#pragma once 

#include "atcore.h" 
#include <queue> 
#include <mutex> 

#define NBUFFERS 128 

#define BUFFER_AVAILABLE 0 
#define BUFFER_QUEUED 1 
#define BUFFER_FULL 2 
#define BUFFER_RECORDING_PENDING 3 

// the status flag cycle is 
// EMPTY -> QUEUED -> FULL -> RECORDING_PENDING -> EMPTY 

using namespace std; 

typedef struct{ 
    AT_U8** buffers; 
    int* flags; 
    int acquiredCounter; 
    int consumedCounter; 
    int queuedCounter; 
    mutex flagMtx; 
} sharedMemory; 

typedef struct{ 
    AT_U8* buffer; 
    int bufSize; 
    int index; 
} record; 

class SharedMemoryManager 
{ 
public: 
    SharedMemoryManager(); 
    ~SharedMemoryManager(void); 

    void enableRecording(); 
    void disableRecording(); 
    int setupMemory(int cameraIdentifier, int bufferSize); 
    void freeMemory(); 
    void freeCameraMemory(int cameraIdentifier); 
    int getBufferSize(int cameraIdentifier); 
    AT_U8* getBufferForCameraQueue(int cameraIdentifier); // get pointer to the  next available buffer for queueing in the camera 
    int hasFramesForRecording(int cameraIdentifier); // ask how many frames for  recording are there in the respective queue 
    AT_U8* getNextFrameForRecording(int cameraIdentifier); // get pointer to the  next buffer to be recorded to a file 
    void copyMostRecentFrame(unsigned char* buffer, int cameraIdentifier); //  TODO // get a copy of the most recent frame on the buffer 
    void notifyAcquiredFrame(AT_U8* buffer, int bufSize, int cameraIdentifier);  // use this function to notify the manager that the buffer has just been filled with  data 
    void notifyRecordedFrame(AT_U8* buffer, int cameraIdentifier); // use this function to notify the manager that the buffer has just been written to file and can be used again 

private: 
    bool useMem0, useMem1; 
    int bufSize0, bufSize1; 
    sharedMemory* memory0; 
    sharedMemory* memory1; 
    queue<record*> framesForRecording0; 
    queue<record*> framesForRecording1; 
    bool isRecording; 

    int allocateBuffers(sharedMemory* mem, int bufSize); 
    void freeBufferArray(sharedMemory* mem); 
}; 

#endif // !__MEM_MANAGER 


// File: SharedMemoryManager.cpp 
... 
int SharedMemoryManager::hasFramesForRecording(int cameraIdentifier){ 
    if (cameraIdentifier!=0 && cameraIdentifier!=1){ 
     cout << "Could not get the number of frames in the shared memory. Invalid camera id " << cameraIdentifier << endl; 
     return -1; 
    } 

    if (cameraIdentifier==0){ 
     return (int)framesForRecording0.size(); 
    } 
    else{ 
     return (int)framesForRecording1.size(); 
    } 
} 

AT_U8* SharedMemoryManager::getNextFrameForRecording(int cameraIdentifier){ 
    if (cameraIdentifier!=0 && cameraIdentifier!=1){ 
     cout << "Error in getNextFrameForRecording. Invalid camera id " <<  cameraIdentifier << endl; 
     return NULL; 
    } 

    sharedMemory* mem; 
    if (cameraIdentifier==0) mem=memory0; 
    else mem=memory1; 

    queue<record*>* framesQueuePtr; 
    if (cameraIdentifier==0) framesQueuePtr = &framesForRecording0; 
    else framesQueuePtr = &framesForRecording1; 

    if (framesQueuePtr->empty()){ // no frames to be recorded at the moment 
     return NULL; 
    } 

    record* item; 
    int idx; 
    AT_U8* buffer = NULL; 

    item = framesQueuePtr->front(); 
    framesQueuePtr->pop(); 
    idx = item->index; 
    delete item; 
    mem->flagMtx.lock(); 
    if (mem->flags[idx] == BUFFER_FULL){ 
     mem->flags[idx] = BUFFER_RECORDING_PENDING; 
     buffer = mem->buffers[idx]; 
    } 
    else{ 
     cout << "PROBLEM. Buffer in getBufferForRecording. Buffer flag is " <<  mem->flags[idx] << endl; 
     cout << "----- BUFFER FLAGS -----" << endl; 
     for (int i=0; i<NBUFFERS; i++){ 
      cout << "[" << i << "] " << mem->flags[i] << endl; 
     } 
     cout << "----- -----" << endl; 
    } 
    mem->flagMtx.unlock(); 
    return buffer; 
} 

int SharedMemoryManager::allocateBuffers(sharedMemory* mem, int bufSize){ 
    // allocate the array for the buffers 
    mem->buffers = (AT_U8**)calloc(NBUFFERS,sizeof(AT_U8*)); 
    if (mem->buffers==NULL){ 
     cout << "Could not allocate array of buffers." << endl; 
     return -1; 
    } 
    // allocate the array for the respective flags 
    mem->flags = (int*)malloc(NBUFFERS*sizeof(int)); 
    if (mem->flags==NULL){ 
     cout << "Could not allocate array of flags for the buffers." << endl; 
     free(mem->buffers); 
     return -1; 
    } 

    int i; 
    for (i=0; i<NBUFFERS; i++){ // allocate the buffers 
     mem->buffers[i] = (AT_U8*)_aligned_malloc((size_t)bufSize,8); 
     if (mem->buffers[i] == NULL){ 
      cout << "Could not allocate memory for buffer no. " << i << endl; 
      for (int j=0; j<i; j++){ // free the previously allocated buffers 
       _aligned_free(mem->buffers[j]); 
      } 
      free(mem->buffers); 
      free(mem->flags); 
      return -1; 
     } 
     else{ 
      mem->flags[i]=BUFFER_AVAILABLE; 
     } 
    } 

    return 0; 
} 

void SharedMemoryManager::freeBufferArray(sharedMemory* mem){ 
    if (mem!=NULL){ 
     for(int i=0; i<NBUFFERS; i++){ 
      _aligned_free(mem->buffers[i]); 
      mem->buffers[i]=NULL; 
     } 
     free(mem->buffers); 
     mem->buffers = NULL; 
     free(mem->flags); 
     mem->flags = NULL; 
     free(mem); 
     mem = NULL; 
    } 
} 

// File: Recorder.h 
#ifndef __RECORDER__ 
#define __RECORDER__ 

#pragma once 

#include <string> 
#include <queue> 
#include <future> 
#include <thread> 
#include "atcore.h" 
#include "SharedMemoryManager.h" 

using namespace std; 

class Recorder 
{ 
public: 
    Recorder(SharedMemoryManager* memoryManager); 
    ~Recorder(); 

    void recordBuffer(AT_U8 *buffer, int bufsize); 
    int setupRecording(string filename0, string filename1, bool open0, bool open1); 
    void startRecording(); 
    void stopRecording(); 
    int testWriteSpeed(string directoryPath, string filename); 
    void insertFrameItem(AT_U8* buffer, int bufSize, int chunkID); 

private: 
    FILE *chunk0, *chunk1; 
    string chunkFilename0, chunkFilename1; 
    int frameCounter0, frameCounter1; 
    bool writes0, writes1; 
    int bufSize0, bufSize1; 

    static SharedMemoryManager* manager; 

    bool isRecording; 

    promise<int> prom0; 
    promise<int> prom1; 
    thread* recordingThread0; 
    thread* recordingThread1; 

    static void performRecording(promise<int>* exitCode, int chunkIdentifier); 
    void writeNextItem(int chunkIdentifier); 
    void closeFiles(); 

}; 

#endif //!__RECORDER__ 

// File: Recorder.cpp 
#include "Recorder.h" 
#include <ctime> 
#include <iostream> 
using namespace std; 

Recorder* recorderInstance; // keep a pointer to the current instance, for accessing static functions from (non-static) objects in the threads 
SharedMemoryManager* Recorder::manager; // the same reason 
...  
void Recorder::startRecording(){ 
    if (isRecording == false){ // do not start new threads if some are still running 
     isRecording = true; 
     if (writes0==true) recordingThread0 = new thread(&Recorder::performRecording, &prom0, 0); 
     if (writes1==true) recordingThread1 = new thread(&Recorder::performRecording, &prom1, 1); 
    } 
} 

void Recorder::writeNextItem(int chunkIdentifier){ 
    FILE* chunk; 
    AT_U8* buffer; 
    int* bufSize; 
    if (chunkIdentifier==0){ 
     chunk = chunk0; 
     bufSize = &bufSize0; 
     buffer = manager->getNextFrameForRecording(0); 
    } 
    else { 
     chunk = chunk1; 
     bufSize = &bufSize1; 
     buffer = manager->getNextFrameForRecording(1); 
    } 

    size_t nbytes = fwrite(buffer, 1, (*bufSize)*sizeof(unsigned char), chunk); 
    if (nbytes<=0){ 
     cout << "No data were written to file." << endl; 
    } 

    manager->notifyRecordedFrame(buffer,chunkIdentifier); 

    if (chunkIdentifier==0) frameCounter0++; 
    else frameCounter1++; 
} 

void Recorder::performRecording(promise<int>* exitCode, int chunkIdentifier){ 
    bool flag = true; 
    int remaining = manager->hasFramesForRecording(chunkIdentifier); 
    while(recorderInstance->isRecording==true || remaining>0){ 
     if (remaining>0){ 
      if (recorderInstance->isRecording==false){ 
       cout << "Acquisition stopped, still " << remaining << " frames are to be recorded in chunk " << chunkIdentifier << endl; 
      } 
      recorderInstance->writeNextItem(chunkIdentifier); 
     } 
     else{ 
      this_thread::sleep_for(chrono::milliseconds(10)); 
     } 
     remaining = manager->hasFramesForRecording(chunkIdentifier); 
    }  
    cout << "Done recording." << endl; 
} 
+7

Это более вероятно, есть утечка памяти в коде. –

+1

Мы увидим все, кроме самой важной вещи в этой дискуссии - вашей программы. – PaulMcKenzie

+0

Спасибо, капитан Очевидный. –

ответ

1

В использовании памяти снимок экрана Windows, вы показываете, самый большой кусок (45GB) является «кэшировать» из которых 27GB является «модифицированным», что означает «грязные страницы, ожидающие записи на диск». Это нормальное поведение, потому что вы пишете быстрее, чем дисковый ввод-вывод может идти в ногу. flush/fflush не влияет на это, потому что это не в вашем процессе. Как вы заметили: «память, используемая моим приложением, остается постоянной все время». Не беспокойтесь. Однако, если вы действительно не хотите, чтобы ОС буферизует грязные выходные страницы, подумайте о том, чтобы использовать «небуферизованный ввод-вывод», доступный в Windows, так как он сразу записывается на диск.

Редактировать: Некоторые ссылки на небуферизованный ввод-вывод в Windows. Обратите внимание, что небуферизованный ввод-вывод устанавливает ограничения на выравнивание памяти при чтении и записи.

File Buffering

CreateFile function

+0

Обратите внимание, что эти страницы не отображаются в «доступной» сумме, так как ОС не может просто отбросить их, не очистив их сначала. – Wheezil