2014-09-17 3 views
1

Я тестирую ZeroMQ, и я получаю только 1227 - 1276 сообщений в секунду. Однако я прочитал, что они должны превышать эту сумму в 100 раз.ZeroMQ производит небольшие результаты

Что я делаю неправильно? Есть ли какая-то конфигурация, которую я могу указать, чтобы исправить это?

Я использую следующие функции: класс

public static final String SERVER_LOCATION = "127.0.0.1"; 
public static final int SERVER_BIND_PORT = 5570; 

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{ 
    ZContext ctx = new ZContext(); 

    Socket frontend = ctx.createSocket(ZMQ.PULL); 
    frontend.bind("tcp://*:"+SERVER_BIND_PORT); 

    int i = 1; 
    do{ 
     ZMsg msg = ZMsg.recvMsg(frontend); 
     ZFrame content = msg.pop(); 
     if(content!= null){ 
      msg.destroy(); 
      System.out.println("Received: "+i); 
      i++; 
      content.destroy(); 
     } 
    }while(true); 
} 

public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{ 
    ZContext ctx = new ZContext(); 
    Socket client = ctx.createSocket(ZMQ.PUSH); 

    client.setIdentity("i".getBytes()); 
    client.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 

    PollItem[] items = new PollItem[] { new PollItem(client, Poller.POLLIN) }; 
    int i = 1; 
    Timer t = new Timer(timeToSpendSending); 
    t.start(); 
    do{ 
     client.send(/* object to send*/ , 0); 
     i++; 
    }while(!t.isDone()); 

    System.out.println("Done with "+i); 
} 

Таймер используется для ограничения времени программа работает для:

class Timer extends Thread{ 
    int time; 
    boolean done; 
    public Timer(int time){ 
     this.time = time; 
     done = false; 
    } 
    public void run(){ 
     try { 
      this.sleep(time); 
      done = true; 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
    public boolean isDone(){ 
     return done; 
    } 
} 

Edit: Я использую jeroMQ

<dependency> 
    <groupId>org.zeromq</groupId> 
    <artifactId>jeromq</artifactId> 
    <version>0.3.4</version> 
</dependency> 

ответ

0

I должен был заменить метод подключения и удалил метку высокой воды (установлен в 0 для неограниченных сообщений в памяти)

код будет выглядеть следующим образом:

public static final String SERVER_LOCATION = "127.0.0.1"; 
public static final int SERVER_BIND_PORT = 5570; 
public static final String TOPIC = "topic1"; 

public static void receiveMessages() throws InvalidProtocolBufferException, FileNotFoundException, UnsupportedEncodingException{ 
    // Prepare our context and subscribe 
     Context context = ZMQ.context(1); 
     Socket subscriber = context.socket(ZMQ.SUB); 

     subscriber.connect("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 
     subscriber.setRcvHWM(0); 
     subscriber.subscribe(TOPIC.getBytes()); 
     System.out.println("subscribed to "+TOPIC); 
     int i = 1; 
     boolean started = false; 
     Timer t = new Timer(timeToSpendSending); 
     do{ 
      String msg = subscriber.recvStr(); 
      if(!TOPIC.equals(msg)){ 
       if(!started){ 
        t.start(); 
        started = true; 
       } 
       i++; 
      } 
     }while(!t.isDone()); 
     System.out.println("Done with: "+i); 
     subscriber.close(); 
     context.term(); 
    } 
    public static void sendMessages() throws FileNotFoundException, UnsupportedEncodingException{ 
     Context context = ZMQ.context(1); 
     Socket publisher = context.socket(ZMQ.PUSH); 
     publisher.bind("tcp://"+SERVER_LOCATION+":"+SERVER_BIND_PORT); 
     publisher.setHWM(0); 
     publisher.setSndHWM(0); 

     int i = 1; 
     Timer t = new Timer(timeToSpendSending); 
     t.start(); 
     do{ 
      publisher.sendMore(TOPIC); 
      publisher.send("Test Data number "+i); 
      i++; 
     }while(!t.isDone()); 
     System.out.println("Done with: "+i); 
     publisher.close(); 
     context.term(); 
    } 

как это, я получил сообщение отсчитывает в диапазоне от 250000 в секунду при отправке и 145000 в секунду при получении.

 Смежные вопросы

  • Нет связанных вопросов^_^