2017-01-18 7 views
0

Я использую Spring Integration с ActiveMQ. Я определил DefaultMessageListenerContainer с maxConcurrentConsumers = 5. На него ссылаются в a. После того, как int-xml: validating-filter и int-xml: unmarshalling-transformer, я определил действие канала очередиInstructionTransformed. И у меня есть подольер для этого канала очереди. Когда я запускаю свое приложение, в консоли ActiveMQ я вижу, что соединение создано и внутри пяти сеансов.Весна Интеграция Несколько пользователей, не обрабатывающих одновременно

Теперь у меня есть @MessageEndpoint с методом аннотированный

@ServiceActivator(inputChannel = "actionInstructionTransformed", poller = @Poller(value = "customPoller")). 

Я получил заявление журнала при входе в метод. Обработка каждого сообщения длительная (несколько минут). В моих журналах я вижу, что thread-1 начинает обработку, а затем я могу видеть только выходы thread-1. Только когда thread-1 закончил обработку сообщения 1, я вижу, что thread-2 начинает обработку следующего сообщения и т. Д. У меня нет синхронизированного блока внутри моего класса, аннотированного @MessageEndpoint. Мне не удалось получить thread-1, thread-2 и т. Д. Сообщения о процессах одновременно.

Неужели кто-то испытал нечто подобное?

ответ

0

Посмотрите, вы говорите:

После Инт-XML: проверяющего-фильтр и ИНТ-XML: демаршаллизации-трансформатор, я определил канал очереди actionInstructionTransformed.

Теперь давайте вернемся к QueueChannel и PollingConsumerdefinitions!

С другой стороны, адаптер канал, соединенный с каналом, который реализует интерфейс org.springframework.messaging.PollableChannel (например, QueueChannel) будет производить экземпляр PollingConsumer.

И обратите внимание, что @Poller (PollerMetadata) имеет taskExecutor вариант.

По умолчанию TaskScedhuler запросить QueueChannel для данных периодически в соответствии с конфигурацией trigger. Если это PeriodicTrigger с параметрами по умолчанию, такими как fixedRate = false, следующий опрос действительно происходит после предыдущего. Вот почему вы видите только один поток.

Итак, попробуйте настроить taskExecutor, и ваши сообщения из этой очереди будут проходить параллельно.

concurrency на DefaultMessageListenerContainer не имеет эффекта. Потому что в конце вы разместите все эти сообщения на QueueChannel. И здесь новая модель Threading начинает работать на основе конфигурации @Poller.

+0

Большое спасибо за помощь. Я добавил task-executor = "customTaskExecutor" в мое определение int: poller и <свойство name = "corePoolSize" value = "5" /> У меня теперь есть 5 потоков обработки одновременно. –