Я использую весеннюю загрузку 1.4.2 и rabbitMQ 1.6.5.RELEASE (не используя пружинный затвор-кролик). В моем проекте есть несколько микросервисов, и большинство микросервисов содержат потребителей rabbitMQ. Один из проектов даст сообщение. Немногие потребители перестают собирать сообщение из очереди, даже если сообщение доступно в очереди. Если я перезапущу этого конкретного потребителя, он начнет потреблять это сообщение. Если я повторно развернуту любого из потребительских микросервисов, то снова несколько потребителей на других компонентах перестанут потреблять сообщение из очереди, но в очереди я вижу сообщение.Spring boot rabbitMQ потребители перестают получать сообщения из-за отказа преобразования json
После просмотра в журналах я могу найти ниже вопрос
com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(com.vodafone.smartlife.provisioning.common.model.Message)
throws com.sample.global.bookStateUpdate.consumer.BookFailConsumer.exception.ConsumerException' threw exception
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:138)
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:105)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:778)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:701)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:99)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:191)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1213)
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:682)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1191)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1175)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:99)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1338)\n\tat java.lang.Thread.run(Thread.java:745)\n
Caused by: java.lang.NullPointerException: null
com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(BookFailConsumer.java:54)
sun.reflect.GeneratedMethodAccessor149.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115)
org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49)
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:125)... 12 common frames omitted\n
Так что, похоже Джексон не смог преобразовать свое сообщение в формате JSON. Поэтому в моем потребителе вместо того, чтобы потреблять фактический объект, я потреблял org.springframework.amqp.core.Message, после чего я вручную преобразовал его в свой пользовательский объект, который работает.
Могу ли я узнать, почему весенний кролик не смог преобразовать сообщение в json?
Пожалуйста, найдите ниже конфигурации & изменение потребительского файла
package com.sample.global.rabbit.configuration;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.test.RabbitListenerTest;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@EnableRabbit
@RabbitListenerTest(capture = true, spy = true)
public class RabbitMqConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(15);
factory.setMaxConcurrentConsumers(15);
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public CachingConnectionFactory connectionFactory()
{
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://localhost:15672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setRequestedHeartBeat(10);
return connectionFactory;
}
@Bean
public MessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean(name = "MainTemplate")
@Primary
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
return template;
}
}
Потребитель
package com.sample.global.rabbit.consumer;
import org.springframework.amqp.core.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Component
public class BookFailConsumer {
private static final Logger logger = LoggerFactory.getLogger(BookFailConsumer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
@Autowired
@Qualifier(value = "MainTemplate")
private RabbitTemplate rabbitTemplate;
@RabbitListener(
id = "book",
bindings = @QueueBinding(
value = @Queue(value = "sample.queue", durable = "true"),
exchange = @Exchange(value = "sample.exchange", durable = "true", delayed = "true"),
key = "sample.queue"
)
)
public void handle(Message messageObject) {
com.sample.global.rabbit.consumer.model.Message message = null;
try {
message = convertMessageBodyToTransaction(messageObject.getBody());
//After manual JSON conversion it works fine.
} catch (Exception e) {
e.printStackTrace();
}
}
public Message convertMessageBodyToTransaction(byte[] messageBody) throws BadRequestException {
com.sample.global.rabbit.consumer.model.Message message = null;
String body = null;
try {
body = new String(messageBody, "UTF-8");
logger.debug("Message body converted successfully to string: {}", body);
} catch (UnsupportedEncodingException e) {
throw new BadRequestException(e);
}
try {
message = objectMapper.readValue(body, Message.class);
logger.debug("Message body mapped successfully to Message object: {}", message.toString());
} catch (Exception e){
logger.error("Message conversion failed for the following message body: {}", body);
throw new BadRequestException(e);
}
return message;
}
}
Есть ли способ, чтобы избежать ручного преобразования? Любой намек было бы полезно, чтобы решить эту проблему
Вам нужно показать полную трассировку стека - вы также должны исправить странное форматирование. –
@GaryRussell Я обновил полную трассировку стека и отформатировал бит. Ваша помощь должна быть заметной – VelNaga
@GaryRussell Хотите ли вы, чтобы я опубликовал код? – VelNaga