2015-03-20 2 views
1

Я боролся с этим в течение двух дней прямо с моим низким знанием с C++. Мне нужно проанализировать последовательности сообщений, используя protobuf C++ API из большого файла, файл, который может содержать миллионы таких сообщений. Чтение прямо из файла легко, так как я всегда могу просто «ReadVarInt32» получить размер, а затем сделать ParseFromCodedStream с ограничением, надавленным на CodedInputStream, как описано в this post. Тем не менее, API уровня ввода/вывода, с которым я работаю (на самом деле libuv), требуется фиксированный размер буфера, выделяемый для каждого действия обратного вызова чтения. По-видимому, размер блока не имеет ничего общего с размером сообщения, которое я читаю.Последовательность парсетов сообщений protobuf из узких кусков буфера с фиксированным размером байта

Это делает мою жизнь трудной. В основном каждый раз, когда я читаю из файла и заполняю буфер фиксированного размера (скажем, 16K), этот буфер, вероятно, содержит сотни полных сообщений protobuf, но последний фрагмент этого буфера, скорее всего, будет неполным. Поэтому я подумал: «Хорошо, что я должен делать, это попытаться прочитать столько сообщений, сколько я могу, и в конце извлеките последний кусок и присоедините его к началу следующего буфера 16K, который я зачитал, продолжайте, пока я не получу EOF файл. Я использую ReadVarInt32() для получения размера, а затем сравниваю это число с остальной частью размера буфера, если размер сообщения меньше, он продолжает читать.

Этот API называется GetDirectBufferPointer, поэтому я пытаюсь использовать его для записи позиции указателя до. Я даже прочитал размер следующего сообщения. Однако я подозреваю, что из-за необычности странности, если я просто извлекаю оставшийся массив байтов, откуда начинается указатель и присоединяется к следующему фрагменту, Parse не удастся, и на самом деле первые несколько байтов (8, я думаю) просто полностью перепутались ,

В качестве альтернативы, если я кодировалStream.ReadRaw() и записывает остаточный поток в буфер, а затем прикрепляется к голове нового блока, данные не будут повреждены. Но проблема в том, что на этот раз я потеряю «размерную» байтовую информацию, поскольку она уже «читается» в «ReadVarInt32»! И даже если я просто пойду и запомню информацию о размере, которую я прочитал в прошлый раз, и сразу вызову в следующем итерационном сообщении. ParseFromCodedStream(), он закончил чтение одного меньшего байта, а часть даже повреждена и не может восстановить объект успешно.

std::vector<char> mCheckBuffer; 
std::vector<char> mResidueBuffer; 
char bResidueBuffer[READ_BUFFER_SIZE]; 
char temp[READ_BUFFER_SIZE]; 
google::protobuf::uint32 size; 
//"in" is the file input stream 
while (in.good()) { 
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE); 
    mCheckBuffer.clear(); 
    //merge the last remaining chunk that contains incomplete message with 
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo 
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(), 
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer)); 

    //Treat the new merged buffer array as the new CIS 
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size()); 
    google::protobuf::io::CodedInputStream cis(&ais); 
    //Record the pointer location on CIS in bResidueBuffer 
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
    &bResidueBufSize); 

    //No size information, probably first time or last iteration 
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message 
    if(size == 0) { 
     cis.ReadVarint32(&size); 
    } 
    //Have to read this again to get remaining buffer size 
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize); 

    //Compare the next message size with how much left in the buffer, if  
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0 
    while (size <= mResidueBufSize && size != 0) { 
     //If this cis I constructed didn't have the size info at the beginning, 
     //and I just read straight from it hoping to get the message out from 
     //the "size" I got from last iteration, it simply doesn't work 
     //(read one less byte in fact, and some part of the message corrupted) 
     //push the size constraint to the input stream; 
     int limit = cis.PushLimit(size); 
     //parse message from the input stream 
     message.ParseFromCodedStream(&cis); 
     cis.PopLimit(limit); 
     google::protobuf::TextFormat::PrintToString(message, &str); 
     printf("%s", str.c_str()); 
     //do something with the parsed object 
     //Now I have to record the new pointer location again 
     cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
     &bResidueBufSize); 
     //Read another time the next message's size and go back to while loop check 
     cis.ReadVarint32(&size); 

    } 
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info 
    cis.ReadRaw(bResidueBuffer, bResidueBufSize); 
    mResidueBuffer.clear(); 
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message 
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness 
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]); 
} 

Я действительно не в курсе. Можно ли изящно использовать protobuf с API-интерфейсами, для которых требуется промежуточный буфер фиксированного размера? Любые входы очень ценятся, спасибо!

ответ

1

Я вижу две основные проблемы с кодом:

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(), 
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer)); 

Похоже, что вы ожидаете std::merge конкатенировать свои буферы, но на самом деле эта функция выполняет слияние двух отсортированных массивов в один отсортированный массив в смысл MergeSort. Это не имеет никакого смысла в этом контексте; mCheckBuffer в конечном итоге будет содержать бессмыслицу.

cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
&bResidueBufSize); 

Здесь вы бросаете &bResidueBuffer к несовместимый тип указателя. bResidueBuffer - массив символов, поэтому &bResidueBuffer является указателем на массив символов, который является не указателем на указатель. Это, по общему признанию, путается, потому что массивы могут быть неявно преобразованы в указатели (где указатель указывает на первый элемент массива), но на самом деле это преобразование. bResidueBuffer сам является не указателем, он может быть просто преобразован в один ,

Я думаю, что вы также не понимаете, что GetDirectBufferPointer().Похоже, вы хотите, чтобы он копировал остальную часть буфера в bResidueBuffer, но метод никогда не копирует никаких данных. Метод возвращает указатель, указывающий на исходный буфер.

правильный способ назвать это что-то вроде:

const void* ptr; 
int size; 
cis.GetDirectBufferPointer(&ptr, &size); 

Теперь ptr укажут в исходный буфер. Вы могли теперь сравнить с указателем на начало буфера, чтобы выяснить, где вы находитесь в потоке, как:

size_t pos = (const char*)ptr - &mCheckBuffer[0]; 

Но, вы не должны делать это, потому что CodedInputStream уже есть метод CurrentPosition() для именно этой цели. Это вернет текущее смещение байта в буфере. Итак, используйте это вместо этого.

+0

Спасибо! Я буду более внимательно читать остальную часть ваших комментариев ... вещь std: merge - определенно отличный улов! Я знал метод CurrentPosition(), но мне нужно скопировать данные, поэтому мне нужно получить указатель. А также, я не пытаюсь использовать GetDirectBufferPointer() для копирования данных, все, что я пытался сделать, это получить информацию об указателе и размере и на самом деле сделать копирование после --- где я назвал метод ReadRaw, а затем " вставить». И с этим у меня есть самая большая проблема, которую я должен задать, потому что информация о байтах размером «size» уже исчезла и не была скопирована в этот момент. – Superziyi

+0

'cis.CurrentPosition()' возвращает позицию внутри 'mCheckBuffer'. Например. вы можете получить указатель, указав '& mCheckBuffer [cis.CurrentPosition()]'. Поэтому вам не нужно использовать 'GetDirectBuferPointer()'. –

+0

Подождите, я полностью забыл, что весь СНГ по-прежнему поддерживается mCheckBuffer! Таким образом, я должен иметь возможность получить произвольный индекс и буфер копирования по этому пути! Позвольте мне больше исследовать остальные ... может быть, есть выход, сохраняющий уже прочитанную информацию о размере ... Большое спасибо! – Superziyi

0

Благодарим Kenton за помощь в указании основных вопросов в моем вопросе. Теперь я пересмотрел часть кода и протестировал ее работу. Я отправлю свое решение здесь. Тем не менее, я не испытываю радости от всех сложностей и проверок, которые мне нужно было сделать здесь. Я думаю, что это склонность к ошибкам. Даже с этим, что я, вероятно, буду делать по-настоящему, я пишу прямое «чтение из потока», блокируя вызов в другом потоке вне моего основного потока libuv, поэтому я не требую использования libuv API. Но для полноты, вот мой код:

std::vector<char> mCheckBuffer; 
std::vector<char> mResidueBuffer; 
std::vector<char> mReadBuffer(READ_BUFFER_SIZE); 
google::protobuf::uint32 size; 
//"in" is the file input stream 
while (in.good()) { 
    //This part is tricky as you're not guaranteed that what end up in 
    //mReadBuffer is everything you read out from the file. The same 
    //happens with libuv's assigned buffer, after EOF, what's rest in 
    //the buffer could be anything 
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE); 
    //merge the last remaining chunk that contains incomplete message with 
    //the new data chunk I got out from buffer. I couldn't find a more 
    //efficient way doing that 
    mCheckBuffer.clear(); 
    mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size()); 
    mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(), 
    mResidueBuffer.end()); 
    mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(), 
    mReadBuffer.end()); 
    //Treat the new merged buffer array as the new CIS 
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size()); 
    google::protobuf::io::CodedInputStream cis(&ais); 
    //No size information, probably first time or last iteration 
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message 
    if(size == 0) { 
     cis.ReadVarint32(&size); 
    } 
    bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition(); 
    //Compare the next message size with how much left in the buffer, if  
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer. If, it's the end of message 
    //and size (next byte I read from stream) happens to be 0, that 
    //will trip me up, cos when I push size 0 into PushLimit and then try 
    //parsing, it will actually return true even if it reads nothing. 
    //So I can get into an infinite loop, if I don't do the check here 
    while (size <= bResidueBufSize && size != 0) { 
     //If this cis I constructed didn't have the size info at the 
     //beginning, and I just read straight from it hoping to get the 
     //message out from the "size" I got from last iteration 
     //push the size constraint to the input stream 
     int limit = cis.PushLimit(size); 
     //parse the message from the input stream 
     bool result = message.ParseFromCodedStream(&cis); 
     //Parse fail, it could be because last iteration already took care 
     //of the last message and that size I read last time is just junk 
     //I choose to only check EOF here when result is not true, (which 
     //leads me to having to check for size=0 case above), cos it will 
     //be too many checks if I check it everytime I finish reading a 
     //message out 
     if(!result) { 
      if(in.eof()) { 
       log.info("Reached EOF, stop processing!"); 
       break; 
      } 
      else { 
       log.error("Read error or input mal-formatted! Log error!"); 
       exit; 
      } 
     } 
     cis.PopLimit(limit); 
     google::protobuf::TextFormat::PrintToString(message, &str); 
     //Do something with the message 

     //This is when the last message read out exactly reach the end of 
     //the buffer and there is no size information available on the 
     //stream any more, in which case size will need to be reset to zero 
     //so that the beginning of next iteration will read size info first 
     if(!cis.ReadVarint32(&size)) { 
      size = 0; 
     } 
     bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition(); 
    } 
    if(in.eof()) { 
     break; 
    } 
    //Now I am copying the residual buffer into the intermediate 
    //mResidueBuffer, which will be merged with newly read data in next iteration 
    mResidueBuffer.clear(); 
    mResidueBuffer.reserve(bResidueBufSize); 
    mResidueBuffer.insert(mResidueBuffer.end(), 
    &mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]); 
} 
if(!in.eof()) { 
    log.error("Something else other than EOF happened to the file, log error!"); 
    exit; 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^