Я боролся с этим в течение двух дней прямо с моим низким знанием с C++. Мне нужно проанализировать последовательности сообщений, используя protobuf C++ API из большого файла, файл, который может содержать миллионы таких сообщений. Чтение прямо из файла легко, так как я всегда могу просто «ReadVarInt32» получить размер, а затем сделать ParseFromCodedStream с ограничением, надавленным на CodedInputStream, как описано в this post. Тем не менее, API уровня ввода/вывода, с которым я работаю (на самом деле libuv), требуется фиксированный размер буфера, выделяемый для каждого действия обратного вызова чтения. По-видимому, размер блока не имеет ничего общего с размером сообщения, которое я читаю.Последовательность парсетов сообщений protobuf из узких кусков буфера с фиксированным размером байта
Это делает мою жизнь трудной. В основном каждый раз, когда я читаю из файла и заполняю буфер фиксированного размера (скажем, 16K), этот буфер, вероятно, содержит сотни полных сообщений protobuf, но последний фрагмент этого буфера, скорее всего, будет неполным. Поэтому я подумал: «Хорошо, что я должен делать, это попытаться прочитать столько сообщений, сколько я могу, и в конце извлеките последний кусок и присоедините его к началу следующего буфера 16K, который я зачитал, продолжайте, пока я не получу EOF файл. Я использую ReadVarInt32() для получения размера, а затем сравниваю это число с остальной частью размера буфера, если размер сообщения меньше, он продолжает читать.
Этот API называется GetDirectBufferPointer, поэтому я пытаюсь использовать его для записи позиции указателя до. Я даже прочитал размер следующего сообщения. Однако я подозреваю, что из-за необычности странности, если я просто извлекаю оставшийся массив байтов, откуда начинается указатель и присоединяется к следующему фрагменту, Parse не удастся, и на самом деле первые несколько байтов (8, я думаю) просто полностью перепутались ,
В качестве альтернативы, если я кодировалStream.ReadRaw() и записывает остаточный поток в буфер, а затем прикрепляется к голове нового блока, данные не будут повреждены. Но проблема в том, что на этот раз я потеряю «размерную» байтовую информацию, поскольку она уже «читается» в «ReadVarInt32»! И даже если я просто пойду и запомню информацию о размере, которую я прочитал в прошлый раз, и сразу вызову в следующем итерационном сообщении. ParseFromCodedStream(), он закончил чтение одного меньшего байта, а часть даже повреждена и не может восстановить объект успешно.
std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
mCheckBuffer.clear();
//merge the last remaining chunk that contains incomplete message with
//the new data chunk I got out from buffer. Excuse my terrible C++ foo
std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
//Treat the new merged buffer array as the new CIS
google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0],
mCheckBuffer.size());
google::protobuf::io::CodedInputStream cis(&ais);
//Record the pointer location on CIS in bResidueBuffer
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//No size information, probably first time or last iteration
//coincidentally read a complete message out. Otherwise I simply
//skip reading size again as I've already populated that from last
//iteration when I got an incomplete message
if(size == 0) {
cis.ReadVarint32(&size);
}
//Have to read this again to get remaining buffer size
cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);
//Compare the next message size with how much left in the buffer, if
//message size is smaller, I know I can read at least one more message
//out, keep reading until I run out of buffer, or, it's the end of message
//and my buffer just allocated larger so size should be 0
while (size <= mResidueBufSize && size != 0) {
//If this cis I constructed didn't have the size info at the beginning,
//and I just read straight from it hoping to get the message out from
//the "size" I got from last iteration, it simply doesn't work
//(read one less byte in fact, and some part of the message corrupted)
//push the size constraint to the input stream;
int limit = cis.PushLimit(size);
//parse message from the input stream
message.ParseFromCodedStream(&cis);
cis.PopLimit(limit);
google::protobuf::TextFormat::PrintToString(message, &str);
printf("%s", str.c_str());
//do something with the parsed object
//Now I have to record the new pointer location again
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//Read another time the next message's size and go back to while loop check
cis.ReadVarint32(&size);
}
//If I do the next line, bResidueBuffer will have the correct CIS information
//copied over, but not having the "already read" size info
cis.ReadRaw(bResidueBuffer, bResidueBufSize);
mResidueBuffer.clear();
//I am constructing a new vector that receives the residual chunk of the
//current buffer that isn't enough to restore a message
//If I don't do ReadRaw, this copy completely messes up at least the first 8
//bytes of the copied buffer's value, due to I suspect endianness
mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0],
&bResidueBuffer[bResidueBufSize]);
}
Я действительно не в курсе. Можно ли изящно использовать protobuf с API-интерфейсами, для которых требуется промежуточный буфер фиксированного размера? Любые входы очень ценятся, спасибо!
Спасибо! Я буду более внимательно читать остальную часть ваших комментариев ... вещь std: merge - определенно отличный улов! Я знал метод CurrentPosition(), но мне нужно скопировать данные, поэтому мне нужно получить указатель. А также, я не пытаюсь использовать GetDirectBufferPointer() для копирования данных, все, что я пытался сделать, это получить информацию об указателе и размере и на самом деле сделать копирование после --- где я назвал метод ReadRaw, а затем " вставить». И с этим у меня есть самая большая проблема, которую я должен задать, потому что информация о байтах размером «size» уже исчезла и не была скопирована в этот момент. – Superziyi
'cis.CurrentPosition()' возвращает позицию внутри 'mCheckBuffer'. Например. вы можете получить указатель, указав '& mCheckBuffer [cis.CurrentPosition()]'. Поэтому вам не нужно использовать 'GetDirectBuferPointer()'. –
Подождите, я полностью забыл, что весь СНГ по-прежнему поддерживается mCheckBuffer! Таким образом, я должен иметь возможность получить произвольный индекс и буфер копирования по этому пути! Позвольте мне больше исследовать остальные ... может быть, есть выход, сохраняющий уже прочитанную информацию о размере ... Большое спасибо! – Superziyi