2016-07-14 6 views
1

Рассмотрим следующие два процесса:ZeroMQ REQ/REP

sender.cpp:

#include <zhelpers.h> 
... 
zmq::context_t ctx(1); 
... 
void foo(int i) 
{ 
    zmq::socket_t sender(ctx, ZMQ_REQ); 
    sender.connect("tcp://hostname:5000"); 

    std::stringstream ss; 
    ss <<"bar_" <<i; 
    std::string bar_i(std::move(ss.str()); 

    s_sendmore(sender, "foo "); 
    (i != N) ? s_send(sender, bar, 0) : s_send(sender, "done", 0); 
    s_recv(sender); 
} 

int main() 
{ 
    for(int i=0; i<=100000; ++i) 
     foo(i); 
    return 0; 
} 

receiver.cpp

#include <zhelpers.h> 
... 
int main() 
{ 
    zmq::context_t ctx(1); 
    zmq::socket_t rcv(ctx, ZMQ_REP); 
    rcv.bind("tcp://*:5000"); 

    std::string s1(""); 
    std::string s2(""); 

    while(s2 != "done") 
    { 
     s1 = std::move(s_recv(rcv)); 
     s2 = std::move(s_recv(rcv)); 
     std::cout <<"received: " <<s1 <<" " <<s2 <<"\n"; 
     s_send(rcv, "ACK"); 
    } 

    return 0; 
} 

Начнем два процесса. То, что я бы ожидать, что процесс приемник будет принимать все сообщения, отправитель посылает к нему, и он будет печатать:

foo bar_1 
foo bar_2 
... 

и так далее, до:

... 
foo bar_100000 

И I Ожидаю, что он сделает это без каких-либо блокировок.

Моя проблема в том, что приемник всегда придерживается 28215-й итерации (всегда вокруг этого числа !!!) и блокируется до минуты или около того. Затем он идет дальше до 100000, но иногда он снова прилипает. Мой вопрос, конечно, почему это происходит? Как я могу это исправить?

Я попытался поместить 'sender' внутри foo (.) В глобальную область, а затем он сработал: в этом случае все распечатки шли от 1 до 100000 плавно и супер-быстро, без каких-либо блокировок Конечно, в этом случае сокет не создавался каждый раз, когда вызывался foo (.)). Но, к сожалению, в моем коде не могу этого сделать.

Хотелось бы понять, почему происходит этот блок.

+0

Максимальные сокеты могут быть ограничены на стороне сервера. Попытайтесь его увеличить, это может решить проблему. потому что для tcp требуется время для очистки мертвых сокетов, и у вас есть много тех, на которые вы нажимаете максимальное количество сокетов. – somdoron

ответ

0

Прежде всего, ваши примеры не очень надежны, поскольку они не компилируются. Так вот некоторые exapmles, которые должны быть близки к вашему намерению и компилировать

sender.cpp

#include <zmq.hpp> 
#include <string> 
#include <iostream> 
#include <string> 

void send(const std::string& msg) 
{ 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REQ); 

    std::cout << "Connecting to receiver ..." << std::endl; 
    socket.connect ("tcp://localhost:5555"); 

    zmq::message_t request (100); 
    memcpy (request.data(), msg.c_str(), 100); 
    std::cout << "Sending message " << msg << "..." << std::endl; 
    socket.send (request); 
} 

int main() 
{ 
    for(int i = 0; i < 100000; ++i) 
    { 
     send(std::to_string(i)); 
    } 
    send("done"); 
} 

использовать что-то Линк

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include sender.cpp -lzmq -o sender 

receiver.cpp

#include <zmq.hpp> 
#include <string> 
#include <cstring> 
#include <iostream> 

int main() { 
    // Prepare our context and socket 
    zmq::context_t context (1); 
    zmq::socket_t socket (context, ZMQ_REP); 
    socket.bind ("tcp://*:5555"); 

    char buf[100] = {0}; 
    while (std::string(buf).compare("done")) { 
     zmq::message_t request; 

     // Wait for next request from client 
     socket.recv (&request); 
     std::memcpy(buf, request.data(), 100); 
     std::cout << "Received message " << buf << std::endl; 

     // Send reply back to client 
     zmq::message_t reply (5); 
     memcpy (reply.data(), "Hello", 5); 
     socket.send (reply); 
    } 
    return 0; 
} 

использование

g++ -std=c++11 -I/home/dev/cppzmq -I/home/dev/libzmq/include receiver.cpp -lzmq -o receiver 

при запуске процессов, кажется, что все работает нормально, выход на приемнике, как ожидается, без перерывов:

Received message 99996 
Received message 99997 
Received message 99998 
Received message 99999 
Received message done 

но то, что я ожидал: взглянуть на NetStat:

netstat 
Active Internet connections (w/o servers) 
Proto Recv-Q Send-Q Local Address   Foreign Address   State  
tcp  0  0 localhost:38345   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46228   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60309   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46916   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:47600   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:54454   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:46409   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51142   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40355   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:40005   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45614   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48974   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41427   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58740   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:58754   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:60044   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57478   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:50419   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:44361   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:37284   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:38662   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:45968   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:57407   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:59200   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:41292   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:55243   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:51489   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:48865   localhost:5555   TIME_WAIT 
tcp  0  0 localhost:35491   localhost:5555   TIME_WAIT 
... 

У меня есть более 20k (!) Таких сокетов в состоянии TIME_WAIT после одного запуска. Это связано с переменным объемом socket в void send(...) в sender.cpp. Я не знаю, что именно делает zmq при уничтожении сокета, когда он выходит из области видимости, но я уверен, что он вызовет close() на fd сокета где-нибудь, что приведет ваш сокет в это состояние TIME_WAIT. Даже если мой отправитель и получатель работают плавно, я не знаю, как ваша система обрабатывает эти много сокетов. Кроме того, я не знаю, что делает ваш файл zhelpers.h.Но я знаю, что если вы поместите свой сокет в глобальную область видимости, только один вызов close() будет выполняться на стороне отправителя в одном гнезде. Я начал здесь больше исследовать. Возможно, посмотрите на how-to-forcibly-close-a-socket-in-time-wait ...

+0

Спасибо. Будет проверено. Извините, что мой код не компилировался. Я просто хотел показать вам эту проблему (поэтому я использовал, например, «...»). Я не хотел идти на все подробности. Например, zhelpers.h можно найти здесь: https://github.com/imatix/zguide2/tree/master/examples/C%2B%2B – gybacsi

+1

Еще одна вещь: может быть, вы просто не хотите объявлять свои отправляя переменную сокета в глобальную область видимости, но вы решаете проблему с тысячами уже заявленных сокетов TIME_WAIT, если просто сделать область немного больше, то есть объявить ее за пределами своего цикла отправки и повторно использовать ее, по крайней мере, для отправки сообщений 100000 , – yussuf