1

Я использую Spring-boot-starter-amqp 1.4.2.Producer и потребительский режим, но иногда входящие сообщения JSON имеют неправильный синтаксис. Это приводит к следующему (правильному) исключению:Смарт-загрузочная машина RabbitMQ с глобальной ошибкой

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception 
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content 
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token... 

В будущем я могу столкнуться с большим количеством исключений. Поэтому я хочу настроить глобальный обработчик ошибок, чтобы, если в любом из них есть какой-либо исключение, я могу обработать его по всему миру.

Примечание: В этом случае сообщение не достигло потребителя. Я хочу обрабатывать подобные исключения во всем мире по всему потребителю.

Пожалуйста, найдите следующий код:

RabbitConfiguration.java

@Configuration 
@EnableRabbit 
public class RabbitMqConfiguration { 

    @Autowired 
    private CachingConnectionFactory cachingConnectionFactory; 

    @Bean 
    public MessageConverter jsonMessageConverter() 
    { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    @Primary 
    public RabbitTemplate rabbitTemplate() 
    { 
     RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory); 
     template.setMessageConverter(jsonMessageConverter()); 
     return template; 
    } 

} 

Потребитель

@RabbitListener(
     id = "book_queue", 
     bindings = @QueueBinding(
       value = @Queue(value = "book.queue", durable = "true"), 
       exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"), 
       key = "book.queue" 
     ) 
    ) 
public void handle(Message message) { 
//Business Logic 
} 

Может кто-нибудь, пожалуйста, помогите мне справиться с обработчиком ошибок globally.Your помощи должны быть заметными.

Обновленный вопрос согласно Гэри комментарий

я могу в состоянии запустить свой пример и получать ожидаемый результат, как вы сказали, я просто хочу, чтобы попытаться еще несколько негативных случаев основаны на вашем примере, но я не мог» т понять несколько вещей,

this.template.convertAndSend(queue().getName(), new Foo("bar")); 

выход

Поступило: Foo [Foo = бар]

Приведенный выше код работает fine.Now вместо "Foo" я посылаю некоторые другие боб

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you")); 

выход

Received: Foo [Foo = NULL]

Потребитель не должен принимать это сообщение, потому что это совершенно другой компонент (Differ.class not Foo.class), поэтому я ожидаю, что он должен перейти в «ConditionalRejectingErrorHandler». Почему он принимает неправильную полезную нагрузку и печатает как null? Пожалуйста, поправьте меня, если я ошибаюсь.

Edit 1:

Гэри, как вы сказали, что я поставил заголовок «TypeID» при отправке сообщения, но все-таки потребитель может в состоянии преобразовать неправильные сообщения и не бросать какие-либо ошибки .. ,пожалуйста, найти код ниже, я использовал свои образцы кода и просто сделал следующие изменения,

1) Добавлен «__TypeId__» Во время отправки сообщения,

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> { 
     m.getMessageProperties().setHeader("__TypeId__","foo"); 
     return m; 
    }); 

2) Добавлена ​​«DefaultClassMapper» в "Jackson2JsonMessageConverter" слушатель контейнера фабрики фасоли

@Bean 
public MessageConverter jsonConverter() { 
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    DefaultClassMapper mapper = new DefaultClassMapper(); 
    mapper.setDefaultType(Foo.class); 
    converter.setClassMapper(mapper); 
    return new Jackson2JsonMessageConverter(); 
}  

ответ

3

Override ботинка, как описано в Enable Listener Endpoint Annotations.

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setErrorHandler(myErrorHandler()); 
    ... 
    return factory; 
} 

Вы можете вводить пользовательское внедрение ErrorHandler который будет добавляться к каждому слушателю контейнера создает завод.

void handleError(Throwable t); 

Throwable будет ListenerExecutionFailedException, который, начиная с версии 1.6.7 (загрузки 1.4.4), имеет необработанный входящее сообщение в его свойстве failedMessage.

Обработчик ошибок по умолчанию считает, что причины, такие как MessageConversionException, являются фатальными (их не будут требовать).

Если вы хотите сохранить это поведение (нормальное для таких проблем), вы должны выбросить AmqpRejectAndDontRequeueException после обработки ошибки.

Кстати, вам это не нужно RabbitTemplate bean; если в приложении есть только один компонент MessageConverter, загрузка будет автоматически подключена к контейнерам и шаблону.

Поскольку вы станете главной фабрикой ботинка, вы получите .

EDIT

Вы можете использовать по умолчанию ConditionalRejectingErrorHandler, но вводить его с пользовательской реализации FatalExceptionStrategy. Фактически, вы можете подклассифицировать его DefaultExceptionStrategy и переопределить isFatal(Throwable t), затем после передачи ошибки верните super.isFatal(t).

EDIT2

Полный пример; посылает 1 хорошее сообщение и 1 плохой:

package com.example; 

import org.slf4j.Logger; 

import org.springframework.amqp.core.Message; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler; 
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.util.ErrorHandler; 

@SpringBootApplication 
public class So42215050Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args); 
     context.getBean(So42215050Application.class).runDemo(); 
     context.close(); 
    } 

    @Autowired 
    private RabbitTemplate template; 

    private void runDemo() throws Exception { 
     this.template.convertAndSend(queue().getName(), new Foo("bar")); 
     this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> { 
      return new Message("some bad json".getBytes(), m.getMessageProperties()); 
     }); 
     Thread.sleep(5000); 
    } 

    @RabbitListener(queues = "So42215050") 
    public void handle(Foo in) { 
     System.out.println("Received: " + in); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setMessageConverter(jsonConverter()); 
     factory.setErrorHandler(errorHandler()); 
     return factory; 
    } 

    @Bean 
    public ErrorHandler errorHandler() { 
     return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy()); 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("So42215050", false, false, true); 
    } 

    @Bean 
    public MessageConverter jsonConverter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { 

     private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass()); 

     @Override 
     public boolean isFatal(Throwable t) { 
      if (t instanceof ListenerExecutionFailedException) { 
       ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
       logger.error("Failed to process inbound message from queue " 
         + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
         + "; failed message: " + lefe.getFailedMessage(), t); 
      } 
      return super.isFatal(t); 
     } 

    } 

    public static class Foo { 

     private String foo; 

     public Foo() { 
      super(); 
     } 

     public Foo(String foo) { 
      this.foo = foo; 
     } 

     public String getFoo() { 
      return this.foo; 
     } 

     public void setFoo(String foo) { 
      this.foo = foo; 
     } 

     @Override 
     public String toString() { 
      return "Foo [foo=" + this.foo + "]"; 
     } 

    } 
} 

Результат:

Received: Foo [foo=bar] 

2017-02-14 09: 42: 50,972 ОШИБКА 44868 --- [cTaskExecutor-1] 5050Application $ MyFatalExceptionStrategy : Не удалось обработать входящее сообщение из очереди So42215050; Сообщение об ошибке: (Тело: «some bad json» MessageProperties [headers = {ТипId = com.example.So42215050Application $ Foo}, timestamp = null, messageId = null, userId = null, полученоUserId = null, appId = null, clusterId = null, type = null, correId = null, correIdString = null, replyTo = null, contentType = application/json , contentEncoding = UTF-8, contentLength = 0, deliveryMode = null, receivedDeliveryMode = PERSISTENT, expiration = null, priority = 0, redelivered = false, receivedExchange =, receivedRoutingKey = So42215050, полученоDelay = null, deliveryTag = 2, messageCount = 0, consumerTag = amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue = So42215050])

EDIT3

JSON не передает никакой информации о типе. По умолчанию тип конвертирования будет выводиться из типа параметра метода. Если вы хотите отклонить все, что не может быть преобразовано в этот тип, вам необходимо настроить конвертер сообщений соответствующим образом.

Например:

@Bean 
public MessageConverter jsonConverter() { 
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    DefaultClassMapper mapper = new DefaultClassMapper(); 
    mapper.setDefaultType(Foo.class); 
    converter.setClassMapper(mapper); 
    return converter; 
} 

Теперь, когда я изменить свой пример, чтобы отправить Bar вместо Foo ...

public static class Bar { 

    ... 

} 

и

this.template.convertAndSend(queue().getName(), new Bar("baz")); 

я получаю. ..

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message 
... 13 common frames omitted 
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}] 

Но это работает только в том случае, если отправитель устанавливает заголовок __TypeId__ (который делает шаблон, если он сконфигурирован с одним и тем же адаптером).

EDIT4

@SpringBootApplication 
public class So42215050Application { 

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass()); 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args); 
     context.getBean(So42215050Application.class).runDemo(); 
     context.close(); 
    } 

    @Autowired 
    private RabbitTemplate template; 

    private void runDemo() throws Exception { 
     this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type 
     this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> { 
      return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json 
     }); 
     Message message = MessageBuilder 
       .withBody("{\"foo\":\"bar\"}".getBytes()) 
       .andProperties(
         MessagePropertiesBuilder 
          .newInstance() 
          .setContentType("application/json") 
          .build()) 
       .build(); 
     this.template.send(queue().getName(), message); // Success - default Foo class when no header 
     message.getMessageProperties().setHeader("__TypeId__", "foo"); 
     this.template.send(queue().getName(), message); // Success - foo is mapped to Foo 
     message.getMessageProperties().setHeader("__TypeId__", "bar"); 
     this.template.send(queue().getName(), message); // fail - mapped to a Map 
     Thread.sleep(5000); 
    } 

    @RabbitListener(queues = "So42215050") 
    public void handle(Foo in) { 
     logger.info("Received: " + in); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setMessageConverter(jsonConverter()); 
     factory.setErrorHandler(errorHandler()); 
     return factory; 
    } 

    @Bean 
    public ErrorHandler errorHandler() { 
     return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy()); 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("So42215050", false, false, true); 
    } 

    @Bean 
    public MessageConverter jsonConverter() { 
     Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
     DefaultClassMapper mapper = new DefaultClassMapper(); 
     mapper.setDefaultType(Foo.class); 
     Map<String, Class<?>> mappings = new HashMap<>(); 
     mappings.put("foo", Foo.class); 
     mappings.put("bar", Object.class); 
     mapper.setIdClassMapping(mappings); 
     converter.setClassMapper(mapper); 
     return converter; 
    } 

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy { 

     private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass()); 

     @Override 
     public boolean isFatal(Throwable t) { 
      if (t instanceof ListenerExecutionFailedException) { 
       ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t; 
       logger.error("Failed to process inbound message from queue " 
         + lefe.getFailedMessage().getMessageProperties().getConsumerQueue() 
         + "; failed message: " + lefe.getFailedMessage(), t); 
      } 
      return super.isFatal(t); 
     } 

    } 

    public static class Foo { 

     private String foo; 

     public Foo() { 
      super(); 
     } 

     public Foo(String foo) { 
      this.foo = foo; 
     } 

     public String getFoo() { 
      return this.foo; 
     } 

     public void setFoo(String foo) { 
      this.foo = foo; 
     } 

     @Override 
     public String toString() { 
      return "Foo [foo=" + this.foo + "]"; 
     } 

    } 

    public static class Bar { 

     private String foo; 

     public Bar() { 
      super(); 
     } 

     public Bar(String foo) { 
      this.foo = foo; 
     } 

     public String getFoo() { 
      return this.foo; 
     } 

     public void setFoo(String foo) { 
      this.foo = foo; 
     } 

     @Override 
     public String toString() { 
      return "Bar [foo=" + this.foo + "]"; 
     } 

    } 

} 
+0

Спасибо большое за response.Need еще несколько разъяснений, которые я не понимаю из вашего answer.1) Для реализации глобального обработчика ошибок, мы должны иметь боб «SimpleRabbitListenerContainerFactory»? (Нет другого способа) 2) Я вижу «ErrorHandler» как метод. Можно ли определить bean-компонент как «ErrorHandler»? Не могли бы вы поделиться примером кода для эффективного способа написания ErrorHandler? По крайней мере, намек или ссылка. 3) Вы упомянули, что один «MessageConverter» является избыточным, и загрузка будет автоматически подключена к контейнерам. Для меня весенняя сапочка не делает автоматически, что я что-то пропустил? – VelNaga

+0

См. Мое последнее редактирование для полного рабочего примера. –

+0

Это теперь доступно в [spring-amqp-samples] (https://github.com/spring-projects/spring-amqp-samples) как 'spring-rabbit-global-errorhandler'. –