2017-01-15 4 views
0

Я пытаюсь создать простой поток, когда мой поток начинается с получения HTTP-запроса на отправку VIA HTTP Inbound Channel Adapter и публикует его в «SubscribableChannel». На этом канале может быть «N» количество подписчиков. На рисунке ниже показан поток.Spring Integration «Публикация канала подписки» с Spring DSL

enter image description here

Я пытаюсь использовать Spring DSL, чтобы настроить этот поток и возникли проблемы, делая работу. Ниже мой код.

@Bean 
public IntegrationFlow receiveHttpPost() { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive") 
      .mappedRequestHeaders("*") 
      .requestChannel(httpInAdapterPubSubChannel())) 
      .transform(new ObjectToStringTransformer()) 
      .get(); 
} 

@Bean 
public SubscribableChannel httpInAdapterPubSubChannel() 
{ 
    return MessageChannels.publishSubscribe("httpInAdapterPubSubChannel") 
    .get(); 
} 

@Bean 
public IntegrationFlow subscriber1() { 
    return IntegrationFlows.from(httpInAdapterPubSubChannel()) 
      .handle(message -> System.out.println("Enrich Headers based on Payload....")) 
      .get(); 
} 

@Bean 
public IntegrationFlow subscriber2() { 
    return IntegrationFlows.from(httpInAdapterPubSubChannel()) 
       .handle(message -> System.out.println("Save Payload to Audit Table...")) 
       .get(); 
} 

Когда я запускаю этот поток, я получаю «Не удалось обработать сообщение, вложенное исключение org.springframework.messaging.core.DestinationResolutionException: нет выходного канала или заголовок replyChannel доступен».

o.s.i.channel.PublishSubscribeChannel : preSend on channel 'httpInAdapterPubSubChannel', message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}] 
o.s.i.t.MessageTransformingHandler  : org.springframework.integration.transformer.MessageTransformingHandler#0 received message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}] 
o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available] with root cause 

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE] 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE] 

Очень очевидно, что я делаю что-то ОЧЕНЬ неправильно здесь. Я попытался найти примеры, показывающие «Публикация канала подписки» Интеграция VIA Spring DSL или конфигурация Java. К сожалению, я не смог найти: - /. Я был бы искренне благодарен, если кто-нибудь может предоставить мне пример и помочь мне узнать, что не так с потоком, который у меня есть.

Еще одно замечание, которое я сделал, было: «Когда я удалил подписчиков подписчиков1 и« подписчик2 », я все равно получаю ту же ошибку. Таким образом, это означает, что что-то не так, что я делаю при настройке y HttpInboundAdapter.

Кроме того, если я переключу 'httpInAdapterPubSubChannel' на прямой и только один поток маршрута (без ветвления), все будет работать нормально.

ответ

1

.transform(new ObjectToStringTransformer()) пытается отправить результат где-нибудь, но он не знает, где - входящий адаптер не ожидает ответа, а трансформатору некуда отправлять данные.

Возможно, вы имели в виду что-то вроде этого ...

@Bean 
public IntegrationFlow receiveHttpPost() { 
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive") 
     .mappedRequestHeaders("*")) 
     .transform(new ObjectToStringTransformer()) 
     .channel(httpInAdapterPubSubChannel()) 
     .get(); 
} 

т.е. отправить результат трансформатора в пабе/подканале.

Ссылка на DSL имеет несколько примеров; here и here например.

+0

Благодарим за отзыв и ссылки Гэри. Вы догадались, что правильно, я хотел преобразовать вход и отправить преобразованные данные в канал Pub-Sub и сделал ошибку. Изменили его, как вы рекомендовали, и он работает правильно. –

 Смежные вопросы

  • Нет связанных вопросов^_^