2016-10-27 3 views
0

Как я могу реализовать idempotent Amqp.inboundAdapter()?Idempotent Amqp.inboundAdapter()

Я попытался использовать IdempotentReceiverInterceptor, но он не работает с MessageProducer.

EDIT

@Bean 
IntegrationFlow someFlow(/*...*/) { 
    return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue") 
     .transform(Transformers.fromJson(), c -> c.advice(idempotentInterceptor)) 
     .channel("anotherChannel.input") 
     .get(); 
} 

ответ

0

Вы можете просто применить перехватчик к первому компоненту ниже по потоку от входящего адаптера. Если первым каналом является pub/sub, добавьте мост.

В качестве альтернативы вы можете написать индивидуальный совет и добавить его в цепочку консультаций контейнера входящего адаптера.

EDIT

@SpringBootApplication 
public class So40289644Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So40289644Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend("myqueue", "foo"); 
     template.convertAndSend("myqueue", "bar"); 
     context.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS); 
     context.getBean(RabbitAdmin.class).declareQueue(new Queue("myqueue")); 
     context.close(); 
    } 

    @Bean 
    public Jackson2JsonMessageConverter converter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    public CountDownLatch latch() { 
     return new CountDownLatch(1); 
    } 

    @Bean 
    IntegrationFlow someFlow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue")) 
       .channel("in") 
       .get(); 
    } 

    @Bean 
    @Transformer(inputChannel = "in", adviceChain = "idempotentInterceptor", outputChannel = "next") 
    public JsonToObjectTransformer transformer() { 
     return Transformers.fromJson(); 
    } 

    @Bean 
    IntegrationFlow remaining() { 
     return IntegrationFlows.from("next") 
      .handle(m -> { 
       System.out.println(m); 
       latch().countDown(); 
      }) 
      .get(); 
    } 

    @Bean 
    public IdempotentReceiverInterceptor idempotentInterceptor() { 
     IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(
       m -> !(new String(((byte[]) m.getPayload())).equals("\"foo\""))); 
     interceptor.setDiscardChannel(new NullChannel()); 
     return interceptor; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("myqueue"); 
    } 
+0

Я отредактировал мой вопрос. Я поставил свой перехватчик в советах трансформатора, но это не сработало. Не могли бы вы показать мне, что я делаю неправильно? –

+0

В DSL есть ошибка: она не применяется правильно. Я ищу работу. –

+0

Я так думаю, совет завернут в AdvisedRequestHandler, и когда перехватчик проверяет, вызван ли этот экземпляр MessageHandler, он терпит неудачу. –

 Смежные вопросы

  • Нет связанных вопросов^_^