2017-01-18 6 views
1

Я использую Spring AMQP для прослушивания сообщений (конфигурация имеет listener-container, service-activator, chain, bridge & aggregators). При запуске приложения AMQP начинает чтение сообщений, которые мы не хотим. Я пробовал auto-startup=false, но он не работает. Я что-то пропустил?AMQP Предотвращение автоматического чтения сообщений

Кроме того, если он действительно работает, то как мне программно запустить их снова? Я попробовал listenerContainer.start();. Что относительно агрегаторов & других?

EDIT

Ниже мой конфиг:

<rabbit:queue name="my_queue1" declared-by="consumerAdmin"/> 
<rabbit:queue name="my_queue2" declared-by="consumerAdmin"/> 
<rabbit:queue name="my_batch1" declared-by="consumerAdmin"/> 

<int-amqp:channel id="myPollableChannel" message-driven="false" connection-factory="consumerConnFactory" queue-name="my_queue2"/> 
<int-event:inbound-channel-adapter channel="myPollableChannel" auto-startup="false"/> 

<int-amqp:channel id="myAggregateChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myAggregateChannel" auto-startup="false"/> 

<int-amqp:channel id="myChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myChannel" auto-startup="false"/> 

<int-amqp:channel id="myFailedChannel" connection-factory="consumerConnFactory"/> 
<int-event:inbound-channel-adapter channel="myFailedChannel" auto-startup="false"/> 

<rabbit:template id="genericTopicTemplateWithRetry" connection-factory="connectionFactory" exchange="my_exchange" retry-template="retryTemplate"/> 

<rabbit:topic-exchange name="my_exchange" declared-by="consumerAdmin"> 
     <rabbit:bindings> 
      <rabbit:binding queue="my_queue1" pattern="pattern1"/> 
      <rabbit:binding queue="my_queue2" pattern="pattern1"/> 
     </rabbit:bindings> 
</rabbit:topic-exchange> 

<int:handler-retry-advice id="retryAdvice" max-attempts="5" recovery-channel="myFailedChannel"> 
    <int:exponential-back-off initial="3000" multiplier="5.0" maximum="300000"/> 
</int:handler-retry-advice> 

<int:bridge input-channel="myPollableChannel" output-channel="myAggregateChannel"> 
    <int:poller max-messages-per-poll="100" fixed-rate="5000"/> 
</int:bridge> 

<int:aggregator id="myBatchAggregator" 
    ref="myAggregator" 
    correlation-strategy="myCorrelationStrategy" 
    release-strategy="myReleaseStrategy" 
    input-channel="myAggregateChannel" 
    output-channel="myChannel" 
    expire-groups-upon-completion="true" 
    send-partial-result-on-expiry="true" 
    group-timeout="1000" /> 

<int:chain input-channel="myFailedChannel"> 
    <int:transformer expression="'Failed to publish messages to my channel:' + payload.failedMessage.payload" /> 
    <int-stream:stderr-channel-adapter append-newline="true"/> 
</int:chain> 

<int:service-activator input-channel="myChannel" output-channel="nullChannel" ref="myWorker" method="myMethod"> 
    <int:request-handler-advice-chain><ref bean="retryAdvice" /></int:request-handler-advice-chain> 
</int:service-activator> 

<rabbit:listener-container connection-factory="consumerConnFactory" requeue-rejected="false" concurrency="1"> 
    <rabbit:listener ref="myListener" method="listen" queue-names="queues1" admin="consumerAdmin" /> 
</rabbit:listener-container> 
+0

Вы должны поделиться своей конфигурацией. Не читайте сообщения из AMQP, на самом деле 'auto-startup = false' на адаптере входящего канала. Достаточно. Вам не нужно беспокоиться об агрегаторе и других –

+0

Спасибо Artem. Добавили мою конфигурацию выше. Я использую следующую функцию для включения его снова: 'частных недействительных startMessageListeners() { окончательных Map контейнеры = this.applicationContext.getBeansOfType (AbstractMessageListenerContainer.class); if (контейнеры! = Null &&! Container.isEmpty()) { for (final AbstractMessageListenerContainer container: container.values ​​()) { if (! Container.isRunning()) { container.start(); } } } } ' Прошу предложить. – Ameya

ответ

0

OK. Спасибо за настройку!

Не знаете, зачем вам нужны каналы AMQP-bascked, но главная проблема для вас - именно там.

Но обратите внимание, что <int-amqp:channel> имеет также auto-startup="false".

И вы будете готовы принять данные от AMQP, вам понадобятся только start() этих каналов по их id.

+0

Даже после обновления конфигурации, как показано ниже: ' ' Я до сих пор получаю: ' [org.springframework.integration.amqp.channel.PollableAmqpChannel] (task-scheduler-1) Вызов получения с таймаутом на PollableAmqpChannel. Тайм-аут будет проигнорирован, так как время ожидания приема не поддерживается. Я пропустил что-нибудь? – Ameya

+0

Конечно. Потому что вы должны остановить этот «мост» с '' для 'myPollableChannel'. Прости, я пропустил это. 'message-driven =" false "' означает не 'listener'. Таким образом, его жизненный цикл контролируется конечной точкой «poller». –

+0

У меня есть каналы, основанные на прослушивании, для запуска/остановки и предотвращения 'автоматического запуска '' PollableAmqpChannel'. Но я не мог программным способом «start()» it. Конфигурация/код: ' ' ' final Object channel = this.applicationContext.getBean ("myPollableChannel"); if (канал экземпляры SmartLifecycle) { возвращение ((SmartLifecycle) канал).стоп(); } else if (channel instanceof PollableAmqpChannel) { // FIXME: Невозможно передать в Lifecycle. Как остановить его? } ' – Ameya