Я использую Spring AMQP для прослушивания сообщений (конфигурация имеет listener-container, service-activator, chain, bridge & aggregators
). При запуске приложения AMQP начинает чтение сообщений, которые мы не хотим. Я пробовал auto-startup=false
, но он не работает. Я что-то пропустил?AMQP Предотвращение автоматического чтения сообщений
Кроме того, если он действительно работает, то как мне программно запустить их снова? Я попробовал listenerContainer.start();
. Что относительно агрегаторов & других?
EDIT
Ниже мой конфиг:
<rabbit:queue name="my_queue1" declared-by="consumerAdmin"/>
<rabbit:queue name="my_queue2" declared-by="consumerAdmin"/>
<rabbit:queue name="my_batch1" declared-by="consumerAdmin"/>
<int-amqp:channel id="myPollableChannel" message-driven="false" connection-factory="consumerConnFactory" queue-name="my_queue2"/>
<int-event:inbound-channel-adapter channel="myPollableChannel" auto-startup="false"/>
<int-amqp:channel id="myAggregateChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myAggregateChannel" auto-startup="false"/>
<int-amqp:channel id="myChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myChannel" auto-startup="false"/>
<int-amqp:channel id="myFailedChannel" connection-factory="consumerConnFactory"/>
<int-event:inbound-channel-adapter channel="myFailedChannel" auto-startup="false"/>
<rabbit:template id="genericTopicTemplateWithRetry" connection-factory="connectionFactory" exchange="my_exchange" retry-template="retryTemplate"/>
<rabbit:topic-exchange name="my_exchange" declared-by="consumerAdmin">
<rabbit:bindings>
<rabbit:binding queue="my_queue1" pattern="pattern1"/>
<rabbit:binding queue="my_queue2" pattern="pattern1"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<int:handler-retry-advice id="retryAdvice" max-attempts="5" recovery-channel="myFailedChannel">
<int:exponential-back-off initial="3000" multiplier="5.0" maximum="300000"/>
</int:handler-retry-advice>
<int:bridge input-channel="myPollableChannel" output-channel="myAggregateChannel">
<int:poller max-messages-per-poll="100" fixed-rate="5000"/>
</int:bridge>
<int:aggregator id="myBatchAggregator"
ref="myAggregator"
correlation-strategy="myCorrelationStrategy"
release-strategy="myReleaseStrategy"
input-channel="myAggregateChannel"
output-channel="myChannel"
expire-groups-upon-completion="true"
send-partial-result-on-expiry="true"
group-timeout="1000" />
<int:chain input-channel="myFailedChannel">
<int:transformer expression="'Failed to publish messages to my channel:' + payload.failedMessage.payload" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<int:service-activator input-channel="myChannel" output-channel="nullChannel" ref="myWorker" method="myMethod">
<int:request-handler-advice-chain><ref bean="retryAdvice" /></int:request-handler-advice-chain>
</int:service-activator>
<rabbit:listener-container connection-factory="consumerConnFactory" requeue-rejected="false" concurrency="1">
<rabbit:listener ref="myListener" method="listen" queue-names="queues1" admin="consumerAdmin" />
</rabbit:listener-container>
Вы должны поделиться своей конфигурацией. Не читайте сообщения из AMQP, на самом деле 'auto-startup = false' на адаптере входящего канала. Достаточно. Вам не нужно беспокоиться об агрегаторе и других –
Спасибо Artem. Добавили мою конфигурацию выше. Я использую следующую функцию для включения его снова: 'частных недействительных startMessageListeners() { окончательных Map контейнеры = this.applicationContext.getBeansOfType (AbstractMessageListenerContainer.class); if (контейнеры! = Null &&! Container.isEmpty()) { for (final AbstractMessageListenerContainer container: container.values ()) { if (! Container.isRunning()) { container.start(); } } } } ' Прошу предложить. –
Ameya