Я создал соединение PUB/SUB с использованием zmqpp, и теперь я хочу отправить данные от издателя подписчикам, используя только заголовок , Версия C++ 11 от msgpack-c.Передача данных через ZeroMQ (zmqpp) с использованием MsgPack дает ошибку msgpack :: v1 :: lack_bytes
Издатель должен отправить 2 int64_t
номера - header_1
и header_2
- за которыми следует std::vector<T>
- data
-, где T
определяется (header_1, header_2)
комбинации.
Sinse не так много примеров того, как объединить msgpack и zmqpp, идея, которую я придумал, состоит в том, чтобы отправить сообщение из 3-х частей, используя zmqpp::message::add/add_raw
. Каждая часть будет packed/unpacked с использованием msgpack.
Издатель пакеты одной части данных следующим образом:
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
И приемник распаковывает это так:
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
Когда я запускаю код, я получаю следующее сообщение об ошибке на абонента сторона:
terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
what(): insufficient bytes
Aborted
Также кажется, что zmqpp сгенерировал сообщение из 5 частей, хотя я add()
всего 3 раза.
Q1: Я правильно упаковываю/распаковываю данные?
Q2: Правильно ли это метод отправки буферов msgpack с использованием zmqpp?
Вот важные части кода:
Издательство
zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
zmqpp::message msg;
// meta info about the data
int64_t header_1 = 1234567;
int64_t header_2 = 89;
// sample data
std::vector<double> data;
data.push_back(1.2);
data.push_back(3.4);
data.push_back(5.6);
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());
cout << "header_1:" << header_1 << endl; // header_1:1234567
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_2);
msg.add(buffer.data(), buffer.size());
cout << "header_2:" << header_2 << endl; // header_2:89
}
{
msgpack::sbuffer buffer;
msgpack::pack(buffer, data);
msg.add_raw(buffer.data(), buffer.size());
std::cout << "data: " << data << std::endl; // data:[1.2 3.4 5.6]
}
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"... why ?
publisherSock.send(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
Абонента
zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl; // prints "5 parts"
{
// header 1
{
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(0)),
msg.size(0));
unpackedData.get().convert(&header_1);
cout << "header_1:" << header_1 << endl;
}
// header 2
{
msgpack::unpacked unpackedData;
msgpack::unpack(unpackedData,
static_cast<const char*>(msg.raw_data(1)),
msg.size(1));
unpackedData.get().convert(&header_2);
cout << "header_2:" << header_2 << endl;
}
// data
{
msgpack::unpacked unpacked_data;
msgpack::unpack(unpacked_data,
static_cast<const char*>(msg.raw_data(2)),
msg.size(2));
unpacked_data.get().convert(&data);
std::cout << "data:" << data << std::endl;
}
}
EDIT: Проблема решена: Как отметил @ Йенс, правильный способ е упаковка/передача данных с помощью zmqpp::message::add_raw()
zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());
Спасибо, но даже с 'message :: add_raw()' У меня все тот же крах на подписчике. Кроме того, вместо 6, zmqpp теперь генерирует 5 частей ... и я не понимаю, почему ... – 865719
@ 865719 Вы должны заменить все вызовы на message :: add with message :: add_raw. Вместо добавления int64 для обоих полей заголовка добавляются указатели buffer.data() и buffer.size(). Не могли бы вы добавить 'cout << msg.parts() << endl' после других couts? – Jens
Да! Теперь zmqpp создает сообщение из трех частей, как ожидалось. Кроме того, в первом примере я ошибочно использовал индекс '3' вместо' 2' при распаковке вектора в полученной стороне. – 865719