Вы можете просто применить перехватчик к первому компоненту ниже по потоку от входящего адаптера. Если первым каналом является pub/sub, добавьте мост.
В качестве альтернативы вы можете написать индивидуальный совет и добавить его в цепочку консультаций контейнера входящего адаптера.
EDIT
@SpringBootApplication
public class So40289644Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So40289644Application.class, args);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("myqueue", "foo");
template.convertAndSend("myqueue", "bar");
context.getBean(CountDownLatch.class).await(10, TimeUnit.SECONDS);
context.getBean(RabbitAdmin.class).declareQueue(new Queue("myqueue"));
context.close();
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public CountDownLatch latch() {
return new CountDownLatch(1);
}
@Bean
IntegrationFlow someFlow(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "myqueue"))
.channel("in")
.get();
}
@Bean
@Transformer(inputChannel = "in", adviceChain = "idempotentInterceptor", outputChannel = "next")
public JsonToObjectTransformer transformer() {
return Transformers.fromJson();
}
@Bean
IntegrationFlow remaining() {
return IntegrationFlows.from("next")
.handle(m -> {
System.out.println(m);
latch().countDown();
})
.get();
}
@Bean
public IdempotentReceiverInterceptor idempotentInterceptor() {
IdempotentReceiverInterceptor interceptor = new IdempotentReceiverInterceptor(
m -> !(new String(((byte[]) m.getPayload())).equals("\"foo\"")));
interceptor.setDiscardChannel(new NullChannel());
return interceptor;
}
@Bean
public Queue queue() {
return new Queue("myqueue");
}
Я отредактировал мой вопрос. Я поставил свой перехватчик в советах трансформатора, но это не сработало. Не могли бы вы показать мне, что я делаю неправильно? –
В DSL есть ошибка: она не применяется правильно. Я ищу работу. –
Я так думаю, совет завернут в AdvisedRequestHandler, и когда перехватчик проверяет, вызван ли этот экземпляр MessageHandler, он терпит неудачу. –