2017-02-20 33 views
1

У нас есть контекст интеграции Spring с 2 агрегаторами, некоторыми трансформаторами, адаптерами и т. Д. Этот контекст потребляет сообщения из очередей ActiveMQ, а также 2 веб-сервиса и помещает их в агрегатор. Мы хотим добиться того, чтобы каждый раз, когда мы получаем сообщение в конкретной очереди, мы сбрасываем все сообщения в агрегаторах и каждый компонент statefull запускает каждый пакетный процесс (инициированный стартовым сообщением в очереди) полностью пустым и чистый.Способ сброса всех сообщений в контексте интеграции Spring

Итак, как сбросить компоненты агрегатора на основе сообщения, полученного в очереди?

+0

Мне интересно, почему корреляционная стратегия не работает для вас, чтобы достичь границ партии, начиная с этого конкретного сообщения ... –

+0

Я не уверен, что я вас понимаю. Проблема в том, что в середине пакетного процесса мы можем получить другое сообщение о запуске партии, поэтому мы должны отбросить и убить текущую информацию и начать заново. – JonathanVila

ответ

1

Ваш вариант использования не ясно для меня, но то, что вы хотите получить может быть достичь с помощью MessageGroupStoreReaper:

* Convenient configurable component to allow explicit timed expiry of {@link MessageGroup} instances in a 
* {@link MessageGroupStore}. This component provides a no-args {@link #run()} method that is useful for remote or timed 
* execution and a {@link #destroy()} method that can optionally be called on shutdown. 

Если настроить его для агрегатора MessageGroupStore, он будет выполнять обратный вызов, зарегистрированный из есть:

store.registerMessageGroupExpiryCallback(
      (messageGroupStore, group) -> this.forceReleaseProcessor.processMessageGroup(group)); 

и если вы ничего не настроить на агрегаторе, ваши сообщения будут удалены (по умолчанию NullChannel) и группы будут удалены из магазина.

Итак, когда этот тип сообщения прибыл, вы должны позвонить MessageGroupStoreReaper.run() и только после этого отправить его в процесс.