2012-01-13 1 views
2

Я пытаюсь обрабатывать ситуацию управления потоком на стороне производителя. У меня есть очередь на qpid-broker с максимальным набором размеров очереди. Также установите flow_stop_count и flow_resume_count в очередь.Как обрабатывать управление потоком продукта в jms-сообщении при использовании apache qpid

В настоящее время производитель постоянно продолжает получать сообщения до тех пор, пока не достигнет этого flow_stop_count. При нарушении этого счетчика генерируется исключение, которое обрабатывается прослушивателем Exception. Теперь через некоторое время потребитель в очереди догонит и достигнет flow_resume_count. Вопрос в том, как производитель знает об этом событии.

Вот пример кода производителя

connection connection = connectionFactory.createConnection(); 
    connection.setExceptionListenr(new MyExceptionListerner()); 
    connection.start(); 
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); 
    Queue queue = (Queue)context.lookup("Test"); 
    MessageProducer producer = session.createProducer(queue); 
    while(notStopped){ 
     while(suspend){//---------------------------how to resume this flag??? 
      Thread.sleep(1000); 
     } 
     TextMessage message = session.createTextMessage(); 
     message.setText("TestMessage"); 
     producer.send(message); 
    } 
    session.close(); 
    connection.close(); 

и для исключения слушателя

private class MyExceptionListener implements ExceptionListener { 
    public void onException(JMSException e) { 
     System.out.println("got exception:" + e.getMessage()); 
     suspend=true; 
    } 
} 

Теперь exceptionlistener является общим слушателем исключений, поэтому она не должна быть хорошей идеей, чтобы приостановить через него протекает производитель.

Что мне нужно, это, пожалуй, какой-то метод на уровне производителя, что-то вроде produer.isFlowStopped(), который я могу использовать для проверки перед отправкой сообщения. Существует ли такая функциональность в qpid api.

Существует документация по qpid website, в которой говорится, что это можно сделать. Но я не мог найти примеров того, как это делается где угодно.

Существует ли стандартный способ обработки такого сценария.

ответ

1

Из того, что я прочитал из документации Apache QPid, кажется, что flow_resume_count и flow_stop_count заставят производителей начать блокировку.

Поэтому единственным вариантом было бы программное обеспечение, чтобы опросить регулярно, пока сообщения не начнут снова течь.

Экстракт от here.

Если производитель отправляет в чрезмерную очередь, брокер отвечает, поручая клиенту не отправлять больше сообщений. Воздействие этого заключается в том, что любые будущие попытки отправки блокируются до тех пор, пока брокер не отменяет порядок управления потоком.

При блокировке клиента будет периодически регистрироваться тот факт, что он заблокирован в ожидании контроля потока.

WARN AMQSession - Брокер исполнение управление потоком было исполнено WARN AMQSession - сообщение отправить задерживается на 5 секунд из-за брокера соблюдение контроля потока WARN AMQSession - Сообщение отправить задерживается на 10 секунд из-за брокера соблюдение контроля потока После заданного периода отправить тайм-аут и бросить JMSException на вызывающий код.

ОШИБКА AMQSession - Ошибка отправки сообщения из-за ожидания ожидания при включенном управлении потоком брокера.

Из этой документации подразумевается, что программное обеспечение, управляющее производителем, должно было бы самостоятельно управлять. Таким образом, в основном, когда вы получаете исключение, что очередь является чрезмерной, вам нужно будет отступить и, скорее всего, опросить и повторно отправить свои сообщения.

+0

Да. Я должен был сделать именно это. Получив исключение, я прекратил отправку сообщения, а затем опрос, чтобы проверить размер очереди. Qpid api предоставляет только способ узнать размер текущей очереди. Он не указывает, когда поток прекращается. Поэтому я использую размер очереди при исключении и останавливаю продюсера, когда размер очереди падает до 80% от максимального значения (размер очереди при исключении). – Raks

+0

Да, вероятно, лучше быть превентивным. Способ построения qpid довольно интересен, когда очередь заполняется. Затем он блокирует отправку в течение максимум двух минут, прежде чем выбросить исключение. Это означает, что ваш код производителя может стать нестабильным, если вы не можете иметь дело с длительными блокирующими действиями. – aldridmc

1

Вы можете попробовать установить емкость (размер в байтах, в которой очередь считается полной) и flowResumeCapacity (размер очереди, при которой производители не разгружаются) для очереди.

send() будет заблокирован, если размер превышает значение емкости.

Вы можете взглянуть на это test case file в репо, чтобы получить представление об этом.