2015-08-17 4 views
2

Мы используем активный активный кластер RabbitMQ с зеркальной очередью. С зеркальной политикой является:Настройка аннулирования пользователей в RabbitMQ

«политики»: [{"vhost": "/", "name": "ha-all", "pattern": "", "apply-> to": "все", "определение": { "ха-режим": "все", "ха-синхронизации в режиме": "автоматический"}, "приоритет": 0}]

Версии: RabbitMQ 3.5. 4, Erlang 17.4, spring-amqp/spring-rabbit: 1.4.5.RELEASE

Теперь мы пытаемся добиться отмены потребителя, как указано в Highly Available Queues.

Однако, поскольку мы не использовали канал, мы не можем использовать метод {{basicConsumer}}, как указано в приведенной выше ссылке.

Как мне установить «x-cancel-on-ha-failover» «true в конфигурации, себе?

С фасолью XML является, таким образом:

<rabbit:connection-factory id="connectionFactory" 
    addresses="localhost:5672" 
    username="guest" 
    password="guest" 
    channel-cache-size="5" /> 


<!-- CREATE THE JsonMessageConverter BEAN --> 
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter" /> 


<!-- Spring AMQP Template --> 
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" retry-template="retryTemplate" message-converter="jsonMessageConverter" /> 

<!-- in case connection is broken then Retry based on the below policy --> 
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
<property name="backOffPolicy"> 
    <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 
    <property name="initialInterval" value="500" /> 
    <property name="multiplier" value="2" /> 
    <property name="maxInterval" value="30000" /> 
</bean> 
</property> 
</bean> 

<rabbit:queue name="testQueue" durable="true"> 
    <rabbit:queue-arguments> 
     <entry key="x-max-priority"> 
      <value type="java.lang.Integer">10</value> 
     </entry> 
    </rabbit:queue-arguments> 
</rabbit:queue> 

<bean id="messsageConsumer" class="consumer.RabbitConsumer"> 
</bean> 
<rabbit:listener-container 
    connection-factory="connectionFactory" concurrency="5" max-concurrency="5"  message-converter="jsonMessageConverter"> 
<rabbit:listener queues="testQueue" ref="messsageConsumer" /> 
</rabbit:listener-container> 

ответ

1

<rabbit:listener-container> фактически заполняет SimpleMessageListenerContainer боб на фоне. И последний поддерживает public void setConsumerArguments(Map<String, Object> args) по этому вопросу.

Итак, чтобы исправить ваши требования, вам просто нужно собрать необработанные SimpleMessageListenerContainer<bean> для вашего messsageConsumer.

Тем временем вы исправляете это для своего приложения, я бы попросил у вас JIRA относительно добавления компонента <consumer-arguments>. И мы сможем обратиться к нему с текущим сроком GA.

+0

Спасибо Артем. Казалось, что компонент SimpleMessageListenerContainer работает. Однако отмена Потребителя не отменяет/останавливает поток, который выполнял сообщение. Сообщение переустановлено и обрабатывается дважды. –