2016-02-10 3 views
2

надеюсь, что это последний вопрос, который я задаю на интеграцию с весной.Весенний интеграционный поток: выполнить задачу в потоке

Столкнувшись следующая проблема: в конце довольно длинного IntegrationFlow Dsl листа есть код:

return IntegrationFlows. 
    //... 
     .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header("jms_replyTo", responseQueue(), true)) // IntegrationMessageHeaderAccessor.CORRELATION_ID is not acceptable though message came from outgoingGateway of another application with this header been set 
     .handle(requestRepository::save) 
     .handle(
      Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue()) 
     ) 
     .get(); 

Проблема заключается в том, что после некоторого кода, как обработчики requestRepository::save цепь становится неработающей. Этот трюк работает только в том случае, если в качестве параметра обработчика принят шлюз.

Как я могу преодолеть это ограничение? Я думаю, что использование wireTap здесь не будет заключать сделку, потому что это асинхронно. Здесь, на самом деле, я сохраняю сообщение для хранения его заголовка jms_replyTo и заменяю его сохраненным после того, как соответствующий ответ возвращается с сервера (шаблон интеграции с корпоративной Smart Proxy).

Любые предложения, пожалуйста?

ответ

1

Не знаете, почему вы говорите «последний вопрос». Собираетесь ли вы отказаться от весенней интеграции? :-(

Я предполагаю проблему там со следующей .handle(), потому что ваш requestRepository::save является односторонний MessageHandler (void возвратом из метода save()). Или ваш save() возвращается null.

IntegrationFlow представляет собой цепь расстрелы, когда следующий будет называться после того, как предыдущий с ненулевым результатом.

Итак, поделитесь своими requestRepository::save, пожалуйста!

UPDATE

Ни сделали помощь объявляя MessageHandler боб как (м) -> requestRepository.save (м) и передать его в ручку метода (..), как пары.

Да ... Я хотел бы видеть подпись для вашего requestRepository::save.

Итак, посмотрите. Используя ссылку на метод для .handle(), вы должны определить свой сценарий. Если вы делаете ручку one-way с остановкой потока, достаточно выполнить с контрактом org.springframework.messaging.MessageHandler. с, что вы метод подпись должна быть такая:

public void myHandle(Message<?> message) 

Если вы хотите, чтобы идти вперед по течению, вы должны вернуть что-либо из метода обслуживания. И этот результат станет payload для следующих .handle(). В этом случае ваш метод должен следовать с договором org.springframework.integration.dsl.support.GenericHandler. Ваш метод подписи может выглядеть следующим образом:

public Bar myHandle(Foo foo, Map<String, Object> headers) 

Вот как эталонный метод работает для .handle().

Вы должны понимать, как работает этот стиль method chain. Выходом предыдущего метода является ввод следующего. В этом случае мы защищаем поток от мертвого кода, например MessageHandler с возвратом void, но есть следующий элемент потока.Вот почему вы видите ошибку This is the end of the integration flow..

+0

Правильно, запрос 'save' метода repo возвращает ничего. У меня нет кода прямо сейчас на моих глазах, но он просто сохраняет сообщение в db и метод, объявленный как 'void'. (Нет, я говорю последнее, потому что у меня такое чувство, о котором я часто прошу.) Спасибо за совет, постараюсь вернуть значение. –

+0

:-). Нет проблем для просить! Я не чувствую себя хорошо во многих других вещах, поэтому я тоже вынужден спросить. С другой стороны, моя работа - отвечать на подобные вопросы и помогать людям понять и использовать Framework. С другой стороны, такая обратная связь, как ваша, поможет нам улучшить структуру. На данный момент похоже, что документация или JavaDocs не понятны, и эта цепочка вызовов должна быть лучше объяснена. –

+0

Точно, Артем. Это то, что я только хотел заметить. Spring framework - это шедевр, и его документация всегда была очень приятной и полезной. Но интеграция с весной кажется более сложным, чем другие. Документация показывает каждый аспект отдельно, но трудно понять, как объединить все части вместе. Мои коллеги мнения те же. Был бы очень рад, если вы немного улучшите документацию, чтобы описать, как организовать потоки. (Еще один вопрос - есть ли какие-либо схемы интеграции, которые вы собираетесь реализовать в будущем в этом модуле?) Спасибо. –

0

Наконец придумали решение:

@Bean 
public MessageHandler requestPersistingHandler() { 
    return new ServiceActivatingHandler(message -> { 
     requestRepository.save(message); 
     return message.getPayload(); 
    }); 
} 

//... 
@Bean 
public IntegrationFlow requestFlow() { 
    return IntegrationFlows.from(
      Jms.messageDrivenChannelAdapter(queueConnectionFactory()).destination(bookingQueue()) 
    ) 
      .wireTap(controlBusMessageChannel()) 
      .enrichHeaders(headerEnricherSpec -> headerEnricherSpec.header(JMS_REPLY_HEADER, responseQueue(), true)) 
      .handle(requestPersistingHandler()) 
      .handle(
        Jms.outboundAdapter(queueConnectionFactory()).destination(serverQueue()) 
      ) 
      .get(); 
} 

Я просто не уверен, что если есть более прямой путь.

Единственная проблема заключается в том, как изменить заголовок в «from-the-server» IntegrationFlow внутри метода enrichHeaders: понятия не имею, как получить доступ к существующим заголовкам со спецификацией.