Override ботинка, как описано в Enable Listener Endpoint Annotations.
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(myErrorHandler());
...
return factory;
}
Вы можете вводить пользовательское внедрение ErrorHandler
который будет добавляться к каждому слушателю контейнера создает завод.
void handleError(Throwable t);
Throwable будет ListenerExecutionFailedException
, который, начиная с версии 1.6.7 (загрузки 1.4.4), имеет необработанный входящее сообщение в его свойстве failedMessage
.
Обработчик ошибок по умолчанию считает, что причины, такие как MessageConversionException
, являются фатальными (их не будут требовать).
Если вы хотите сохранить это поведение (нормальное для таких проблем), вы должны выбросить AmqpRejectAndDontRequeueException
после обработки ошибки.
Кстати, вам это не нужно RabbitTemplate
bean; если в приложении есть только один компонент MessageConverter
, загрузка будет автоматически подключена к контейнерам и шаблону.
Поскольку вы станете главной фабрикой ботинка, вы получите .
EDIT
Вы можете использовать по умолчанию ConditionalRejectingErrorHandler
, но вводить его с пользовательской реализации FatalExceptionStrategy
. Фактически, вы можете подклассифицировать его DefaultExceptionStrategy
и переопределить isFatal(Throwable t)
, затем после передачи ошибки верните super.isFatal(t)
.
EDIT2
Полный пример; посылает 1 хорошее сообщение и 1 плохой:
package com.example;
import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;
@SpringBootApplication
public class So42215050Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar"));
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties());
});
Thread.sleep(5000);
}
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
System.out.println("Received: " + in);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}
@Bean
public MessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
}
Результат:
Received: Foo [foo=bar]
2017-02-14 09: 42: 50,972 ОШИБКА 44868 --- [cTaskExecutor-1] 5050Application $ MyFatalExceptionStrategy : Не удалось обработать входящее сообщение из очереди So42215050; Сообщение об ошибке: (Тело: «some bad json» MessageProperties [headers = {ТипId = com.example.So42215050Application $ Foo}, timestamp = null, messageId = null, userId = null, полученоUserId = null, appId = null, clusterId = null, type = null, correId = null, correIdString = null, replyTo = null, contentType = application/json , contentEncoding = UTF-8, contentLength = 0, deliveryMode = null, receivedDeliveryMode = PERSISTENT, expiration = null, priority = 0, redelivered = false, receivedExchange =, receivedRoutingKey = So42215050, полученоDelay = null, deliveryTag = 2, messageCount = 0, consumerTag = amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue = So42215050])
EDIT3
JSON не передает никакой информации о типе. По умолчанию тип конвертирования будет выводиться из типа параметра метода. Если вы хотите отклонить все, что не может быть преобразовано в этот тип, вам необходимо настроить конвертер сообщений соответствующим образом.
Например:
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
converter.setClassMapper(mapper);
return converter;
}
Теперь, когда я изменить свой пример, чтобы отправить Bar
вместо Foo
...
public static class Bar {
...
}
и
this.template.convertAndSend(queue().getName(), new Bar("baz"));
я получаю. ..
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]
Но это работает только в том случае, если отправитель устанавливает заголовок __TypeId__
(который делает шаблон, если он сконфигурирован с одним и тем же адаптером).
EDIT4
@SpringBootApplication
public class So42215050Application {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
context.getBean(So42215050Application.class).runDemo();
context.close();
}
@Autowired
private RabbitTemplate template;
private void runDemo() throws Exception {
this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
});
Message message = MessageBuilder
.withBody("{\"foo\":\"bar\"}".getBytes())
.andProperties(
MessagePropertiesBuilder
.newInstance()
.setContentType("application/json")
.build())
.build();
this.template.send(queue().getName(), message); // Success - default Foo class when no header
message.getMessageProperties().setHeader("__TypeId__", "foo");
this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
message.getMessageProperties().setHeader("__TypeId__", "bar");
this.template.send(queue().getName(), message); // fail - mapped to a Map
Thread.sleep(5000);
}
@RabbitListener(queues = "So42215050")
public void handle(Foo in) {
logger.info("Received: " + in);
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setErrorHandler(errorHandler());
return factory;
}
@Bean
public ErrorHandler errorHandler() {
return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
}
@Bean
public Queue queue() {
return new Queue("So42215050", false, false, true);
}
@Bean
public MessageConverter jsonConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultClassMapper mapper = new DefaultClassMapper();
mapper.setDefaultType(Foo.class);
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo.class);
mappings.put("bar", Object.class);
mapper.setIdClassMapping(mappings);
converter.setClassMapper(mapper);
return converter;
}
public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {
private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());
@Override
public boolean isFatal(Throwable t) {
if (t instanceof ListenerExecutionFailedException) {
ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
logger.error("Failed to process inbound message from queue "
+ lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
+ "; failed message: " + lefe.getFailedMessage(), t);
}
return super.isFatal(t);
}
}
public static class Foo {
private String foo;
public Foo() {
super();
}
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + "]";
}
}
public static class Bar {
private String foo;
public Bar() {
super();
}
public Bar(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public void setFoo(String foo) {
this.foo = foo;
}
@Override
public String toString() {
return "Bar [foo=" + this.foo + "]";
}
}
}
Спасибо большое за response.Need еще несколько разъяснений, которые я не понимаю из вашего answer.1) Для реализации глобального обработчика ошибок, мы должны иметь боб «SimpleRabbitListenerContainerFactory»? (Нет другого способа) 2) Я вижу «ErrorHandler» как метод. Можно ли определить bean-компонент как «ErrorHandler»? Не могли бы вы поделиться примером кода для эффективного способа написания ErrorHandler? По крайней мере, намек или ссылка. 3) Вы упомянули, что один «MessageConverter» является избыточным, и загрузка будет автоматически подключена к контейнерам. Для меня весенняя сапочка не делает автоматически, что я что-то пропустил? – VelNaga
См. Мое последнее редактирование для полного рабочего примера. –
Это теперь доступно в [spring-amqp-samples] (https://github.com/spring-projects/spring-amqp-samples) как 'spring-rabbit-global-errorhandler'. –