2015-08-22 6 views
0

Я создал соединение PUB/SUB с использованием zmqpp, и теперь я хочу отправить данные от издателя подписчикам, используя только заголовок , Версия C++ 11 от msgpack-c.Передача данных через ZeroMQ (zmqpp) с использованием MsgPack дает ошибку msgpack :: v1 :: lack_bytes

Издатель должен отправить 2 int64_t номера - header_1 и header_2 - за которыми следует std::vector<T> - data -, где T определяется (header_1, header_2) комбинации.

Sinse не так много примеров того, как объединить msgpack и zmqpp, идея, которую я придумал, состоит в том, чтобы отправить сообщение из 3-х частей, используя zmqpp::message::add/add_raw. Каждая часть будет packed/unpacked с использованием msgpack.

Издатель пакеты одной части данных следующим образом:

zmqpp::message msg; 
int64_t header_1 = 1234567; 
msgpack::sbuffer buffer; 
msgpack::pack(buffer, header_1); 
msg.add(buffer.data(), buffer.size()); 

И приемник распаковывает это так:

zmqpp::message msg; 
subscriberSock.receive(msg); 

int64_t header_1; 
msgpack::unpacked unpackedData; 
// crash ! 
msgpack::unpack(unpackedData, 
       static_cast<const char*>(msg.raw_data(0)), 
       msg.size(0)); 
unpackedData.get().convert(&header_1); 

Когда я запускаю код, я получаю следующее сообщение об ошибке на абонента сторона:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes' 
    what(): insufficient bytes 
Aborted 

Также кажется, что zmqpp сгенерировал сообщение из 5 частей, хотя я add() всего 3 раза.

Q1: Я правильно упаковываю/распаковываю данные?

Q2: Правильно ли это метод отправки буферов msgpack с использованием zmqpp?

Вот важные части кода:

Издательство

zmqpp::socket publisherSock; 
/* connection setup stuff ...*/ 

// forever send data to the subscribers 
while(true) 
{ 
    zmqpp::message msg; 

    // meta info about the data 
    int64_t header_1 = 1234567; 
    int64_t header_2 = 89; 
    // sample data 
    std::vector<double> data; 
    data.push_back(1.2); 
    data.push_back(3.4); 
    data.push_back(5.6); 


    { 
     msgpack::sbuffer buffer; 
     msgpack::pack(buffer, header_1); 
     msg.add(buffer.data(), buffer.size()); 
     cout << "header_1:" << header_1 << endl; // header_1:1234567 
    } 

    { 
     msgpack::sbuffer buffer; 
     msgpack::pack(buffer, header_2); 
     msg.add(buffer.data(), buffer.size()); 
     cout << "header_2:" << header_2 << endl; // header_2:89 
    } 

    { 
     msgpack::sbuffer buffer; 
     msgpack::pack(buffer, data); 
     msg.add_raw(buffer.data(), buffer.size()); 
     std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6] 
    } 

    std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ? 
    publisherSock.send(msg); 

    std::this_thread::sleep_for(std::chrono::milliseconds(1000)); 
} 

Абонента

zmqpp::socket subscriberSock; 
/* connection setup stuff ...*/ 

zmqpp::message msg; 
subscriberSock.receive(msg); 

int64_t header_1; 
int64_t header_2; 
std::vector<double> data; 

std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts" 
{ 
    // header 1 
    { 
     msgpack::unpacked unpackedData; 
     // crash ! 
     msgpack::unpack(unpackedData, 
         static_cast<const char*>(msg.raw_data(0)), 
         msg.size(0)); 
     unpackedData.get().convert(&header_1); 
     cout << "header_1:" << header_1 << endl; 
    } 
    // header 2 
    { 
     msgpack::unpacked unpackedData; 
     msgpack::unpack(unpackedData, 
         static_cast<const char*>(msg.raw_data(1)), 
         msg.size(1)); 
     unpackedData.get().convert(&header_2); 
     cout << "header_2:" << header_2 << endl; 
    } 
    // data 
    { 
     msgpack::unpacked unpacked_data; 
     msgpack::unpack(unpacked_data, 
         static_cast<const char*>(msg.raw_data(2)), 
         msg.size(2)); 
     unpacked_data.get().convert(&data); 
     std::cout << "data:" << data << std::endl; 
    } 

} 

EDIT: Проблема решена: Как отметил @ Йенс, правильный способ е упаковка/передача данных с помощью zmqpp::message::add_raw()

zmqpp::message msg; 
int64_t header_1 = 1234567; 
msgpack::sbuffer buffer; 
msgpack::pack(buffer, header_1); 
msg.add_raw(buffer.data(), buffer.size()); 

ответ

2

Я думаю, что вызовы msg.add(buffer.data(), buffer.size() не добавить массив buffer.size() байтов, но называют message::add(Type const& part, Args &&...args), который

  1. msg << buffer.data(), который, вероятно, вызывает message::operator<<(bool) поскольку указатель преобразуется в bool
  2. add(buffer.size()), который затем вызывает msg << buffer.size(), что добавляет значение size_t в качестве следующей части.

Глядя на класс сообщения zmqpp ::, используя сообщение :: add_raw, должен сделать трюк.

PS: Это все без гарантии, потому что я никогда не использовал zmqpp или msgpack.

+0

Спасибо, но даже с 'message :: add_raw()' У меня все тот же крах на подписчике. Кроме того, вместо 6, zmqpp теперь генерирует 5 частей ... и я не понимаю, почему ... – 865719

+2

@ 865719 Вы должны заменить все вызовы на message :: add with message :: add_raw. Вместо добавления int64 для обоих полей заголовка добавляются указатели buffer.data() и buffer.size(). Не могли бы вы добавить 'cout << msg.parts() << endl' после других couts? – Jens

+0

Да! Теперь zmqpp создает сообщение из трех частей, как ожидалось. Кроме того, в первом примере я ошибочно использовал индекс '3' вместо' 2' при распаковке вектора в полученной стороне. – 865719