Я пытаюсь создать маршрутизатор C-ZeroMQ, но кажется, что есть проблема с тем, как я это делаю. Мой маршрутизатор, кажется, инициализирован нулевым, что приводит к сбою остальной части моего клиентского кода. Любые советы о том, почему следует высоко ценить.CZMQ "zsocket_new" создание маршрутизатора всегда NULL?
Моя программа основана на коде CZMQ File Transfer Model 3, но когда я запускаю ее в своей системе, я обнаружил, что функция zsocket_new для создания «маршрутизатора», похоже, не создает надлежащего «маршрутизатора», поэтому, когда Я утверждаю (маршрутизатор), что моя программа вылетает.
Первоначально я обнаружил это, когда моя программа начала сбой в функции zsocket_set_hwm() несколько строк позже. Я исследовал, проверив мои журналы и обнаружил, что моя программа остановлена до управления полным «утверждают маршрутизатор»:
: zmq: attempting to open target file
: zmq: asserting ctx...
: zmq: asserted ctx...
: zmq: router value = 6
: zmq: created router
: zmq: asserting router......
Далее я загрузил программу в GDB и обнаружила, что маршрутизатор сохраняет инициализируются как 0х0.
(gdb) print ctx
$1 = <optimized out>
(gdb) print router
$2 = (void *) 0x0
(gdb) print complete_address
$3 = 0x7fffe80009a0 "tcp://127.0.0.1:6000"
Похоже, что существует проблема с объявлением маршрутизатора, но что может произойти с моим кодом? Я использую соответствующую клиентскую программу, которая не сообщает никаких проблем (вот немного о нем):
zctx_t *ctx = zctx_new();
void *dealer = zsocket_new (ctx, ZMQ_DEALER);
fprintf(fp_srv_log, "%s : %s\n", time_str, "ZMQ client thread launched successfully");
zsocket_bind (dealer, "tcp://*:6000");
fprintf(fp_srv_log, "%s : %s\n", time_str, "client: bound tcp://*:6000");
А вот код обижая, сервер:
int send_file(void *args, zctx_t *ctx, void *pipe)
{
time_t raw_time;
struct tm* timeinfo;
struct arg_struct *input = (struct arg_struct *)args;
const char *fp = input->file_name;
time(&raw_time);
timeinfo = localtime(&raw_time);
char * time_str = asctime(timeinfo);
char * complete_address = "tcp://127.0.0.1:6000";
FILE *fp_clnt_log = fopen("/var/log/myLog.log", "w");
int chunkNum = 0;
fprintf(fp_clnt_log, "%s : zmq: attempting to open target file \n", time_str);
FILE *file_to_xfer = fopen(fp, "r");
assert(file_to_xfer);
time_str = asctime(timeinfo);
fprintf(fp_clnt_log, "%s : zmq: asserting ctx...\n", time_str);
assert(ctx);
fprintf(fp_clnt_log, "%s : zmq: asserted ctx...\n", time_str);
fprintf(fp_clnt_log, "%s : zmq: router value = %d\n", time_str, ZMQ_ROUTER);
void *router = zsocket_new (ctx, ZMQ_ROUTER);
fprintf(fp_clnt_log, "%s : zmq: created router\n", time_str);
fprintf(fp_clnt_log, "%s : zmq: asserting router......\n", time_str);
assert(router);
fprintf(fp_clnt_log, "%s : zmq: asserted router successfully.\n", time_str);
//two parts per msg so HWM is size PIPELINE * 2
zsocket_set_hwm (router, PIPELINE * 2);
fprintf(fp_clnt_log, "%s : zmq: set hwm complete\n", time_str);
fprintf(fp_clnt_log, "%s : zmq: attempting to connect to %s\n", time_str, complete_address);
if(0 == zsocket_connect (router, complete_address))
{
fprintf(fp_clnt_log, "%s : zmq: connected to %s\n", time_str, complete_address);
}
else
{
fprintf(fp_clnt_log, "%s : zmq: failed to connect to %s\n", time_str, complete_address);
}
while (true)
{
time(&raw_time);
timeinfo = localtime(&raw_time);
time_str = asctime(timeinfo);
//first frame in each message is the sender identity
fprintf(fp_clnt_log, "%s : zmq:frame 1\n", time_str);
zframe_t *identity = zframe_recv(router);
fprintf(fp_clnt_log, "%s : zmq clnt: checking identity...\n", time_str);
if (!identity)
{
fprintf(fp_clnt_log, "%s : zmq clnt: no identity, breaking.\n", time_str);
break; //shut down and quit
}
fprintf(fp_clnt_log, "%s : zmq:frame 2\n", time_str);
//second frame is 'fetch' command
char *command = zstr_recv (router);
assert (streq (command, "fetch"));
free (command);
fprintf(fp_clnt_log, "%s : zmq:frame 3\n", time_str);
//third frame is chunk offset in file
char *offset_str = zstr_recv (router);
size_t offset = atoi (offset_str);
free (offset_str);
fprintf(fp_clnt_log, "%s : zmq:frame 4\n", time_str);
//fourth frame is max chunk size
char *chunksz_str = zstr_recv (router);
size_t chunksz = atoi (chunksz_str);
free (chunksz_str);
fprintf(fp_clnt_log, "%s : zmq: reading chunk\n", time_str);
//read chunk of data from file
fseek (file_to_xfer, offset, SEEK_SET);
byte *data = malloc (chunksz);
assert (data);
fprintf(fp_clnt_log, "%s : zmq: sending chunk\n", time_str);
//send resulting chunk to client
size_t size = fread (data, 1, chunksz, file_to_xfer);
zframe_t *chunk = zframe_new (data, size);
zframe_send (&identity, router, ZFRAME_MORE);
zframe_send (&chunk, router, 0);
//printf("Server: Sending chunk %d\n", chunkNum);
chunkNum++;
}
time(&raw_time);
timeinfo = localtime(&raw_time);
time_str = asctime(timeinfo);
fprintf(fp_clnt_log, "%s : closing file\n", time_str);
fclose(file_to_xfer);
fclose(fp_clnt_log);
return 0;
}