Я пытаюсь обрабатывать ситуацию управления потоком на стороне производителя. У меня есть очередь на qpid-broker с максимальным набором размеров очереди. Также установите flow_stop_count и flow_resume_count в очередь.Как обрабатывать управление потоком продукта в jms-сообщении при использовании apache qpid
В настоящее время производитель постоянно продолжает получать сообщения до тех пор, пока не достигнет этого flow_stop_count. При нарушении этого счетчика генерируется исключение, которое обрабатывается прослушивателем Exception. Теперь через некоторое время потребитель в очереди догонит и достигнет flow_resume_count. Вопрос в том, как производитель знает об этом событии.
Вот пример кода производителя
connection connection = connectionFactory.createConnection();
connection.setExceptionListenr(new MyExceptionListerner());
connection.start();
Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup("Test");
MessageProducer producer = session.createProducer(queue);
while(notStopped){
while(suspend){//---------------------------how to resume this flag???
Thread.sleep(1000);
}
TextMessage message = session.createTextMessage();
message.setText("TestMessage");
producer.send(message);
}
session.close();
connection.close();
и для исключения слушателя
private class MyExceptionListener implements ExceptionListener {
public void onException(JMSException e) {
System.out.println("got exception:" + e.getMessage());
suspend=true;
}
}
Теперь exceptionlistener является общим слушателем исключений, поэтому она не должна быть хорошей идеей, чтобы приостановить через него протекает производитель.
Что мне нужно, это, пожалуй, какой-то метод на уровне производителя, что-то вроде produer.isFlowStopped(), который я могу использовать для проверки перед отправкой сообщения. Существует ли такая функциональность в qpid api.
Существует документация по qpid website, в которой говорится, что это можно сделать. Но я не мог найти примеров того, как это делается где угодно.
Существует ли стандартный способ обработки такого сценария.
Да. Я должен был сделать именно это. Получив исключение, я прекратил отправку сообщения, а затем опрос, чтобы проверить размер очереди. Qpid api предоставляет только способ узнать размер текущей очереди. Он не указывает, когда поток прекращается. Поэтому я использую размер очереди при исключении и останавливаю продюсера, когда размер очереди падает до 80% от максимального значения (размер очереди при исключении). – Raks
Да, вероятно, лучше быть превентивным. Способ построения qpid довольно интересен, когда очередь заполняется. Затем он блокирует отправку в течение максимум двух минут, прежде чем выбросить исключение. Это означает, что ваш код производителя может стать нестабильным, если вы не можете иметь дело с длительными блокирующими действиями. – aldridmc