0

Я использую весеннюю загрузку 1.4.2 и rabbitMQ 1.6.5.RELEASE (не используя пружинный затвор-кролик). В моем проекте есть несколько микросервисов, и большинство микросервисов содержат потребителей rabbitMQ. Один из проектов даст сообщение. Немногие потребители перестают собирать сообщение из очереди, даже если сообщение доступно в очереди. Если я перезапущу этого конкретного потребителя, он начнет потреблять это сообщение. Если я повторно развернуту любого из потребительских микросервисов, то снова несколько потребителей на других компонентах перестанут потреблять сообщение из очереди, но в очереди я вижу сообщение.Spring boot rabbitMQ потребители перестают получать сообщения из-за отказа преобразования json

После просмотра в журналах я могу найти ниже вопрос

com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(com.vodafone.smartlife.provisioning.common.model.Message) 
throws com.sample.global.bookStateUpdate.consumer.BookFailConsumer.exception.ConsumerException' threw exception 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:138) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:105) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:778) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:701) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1213) 
org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:682) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1191) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1175) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1200(SimpleMessageListenerContainer.java:99) 
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1338)\n\tat java.lang.Thread.run(Thread.java:745)\n 
Caused by: java.lang.NullPointerException: null 
com.sample.global.bookStateUpdate.consumer.BookFailConsumer.execute(BookFailConsumer.java:54) 
sun.reflect.GeneratedMethodAccessor149.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
java.lang.reflect.Method.invoke(Method.java:498)org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) 
    org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) 
org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:49) 
org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:125)... 12 common frames omitted\n 

Так что, похоже Джексон не смог преобразовать свое сообщение в формате JSON. Поэтому в моем потребителе вместо того, чтобы потреблять фактический объект, я потреблял org.springframework.amqp.core.Message, после чего я вручную преобразовал его в свой пользовательский объект, который работает.

Могу ли я узнать, почему весенний кролик не смог преобразовать сообщение в json?

Пожалуйста, найдите ниже конфигурации & изменение потребительского файла

package com.sample.global.rabbit.configuration; 

import org.springframework.amqp.rabbit.annotation.EnableRabbit; 
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; 
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.test.RabbitListenerTest; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.annotation.Primary; 

@Configuration 
@EnableRabbit 
@RabbitListenerTest(capture = true, spy = true) 
public class RabbitMqConfiguration { 

@Bean 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrentConsumers(15); 
    factory.setMaxConcurrentConsumers(15); 
    factory.setMessageConverter(jsonMessageConverter()); 

    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory() 
{ 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("http://localhost:15672"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setRequestedHeartBeat(10); 
    return connectionFactory; 
} 

@Bean 
public MessageConverter jsonMessageConverter() 
{ 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean(name = "MainTemplate") 
@Primary 
public RabbitTemplate rabbitTemplate() 
{ 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(jsonMessageConverter()); 
    return template; 
} 

} 

Потребитель

package com.sample.global.rabbit.consumer; 

import org.springframework.amqp.core.Message; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.amqp.rabbit.annotation.Exchange; 
import org.springframework.amqp.rabbit.annotation.Queue; 
import org.springframework.amqp.rabbit.annotation.QueueBinding; 
import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.stereotype.Component; 

@Component 
public class BookFailConsumer { 

private static final Logger logger = LoggerFactory.getLogger(BookFailConsumer.class); 

private static final ObjectMapper objectMapper = new ObjectMapper(); 

@Autowired 
@Qualifier(value = "MainTemplate") 
private RabbitTemplate rabbitTemplate; 

@RabbitListener(
     id = "book", 
     bindings = @QueueBinding(
       value = @Queue(value = "sample.queue", durable = "true"), 
       exchange = @Exchange(value = "sample.exchange", durable = "true", delayed = "true"), 
       key = "sample.queue" 
     ) 
) 
public void handle(Message messageObject) { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    try { 
     message = convertMessageBodyToTransaction(messageObject.getBody()); 
     //After manual JSON conversion it works fine. 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public Message convertMessageBodyToTransaction(byte[] messageBody) throws BadRequestException { 
    com.sample.global.rabbit.consumer.model.Message message = null; 
    String body = null; 
    try { 
     body = new String(messageBody, "UTF-8"); 
     logger.debug("Message body converted successfully to string: {}", body); 
    } catch (UnsupportedEncodingException e) { 
     throw new BadRequestException(e); 
    } 
    try { 
     message = objectMapper.readValue(body, Message.class); 
     logger.debug("Message body mapped successfully to Message object: {}", message.toString()); 
    } catch (Exception e){ 
     logger.error("Message conversion failed for the following message body: {}", body); 
     throw new BadRequestException(e); 
    } 
    return message; 
} 

}

Есть ли способ, чтобы избежать ручного преобразования? Любой намек было бы полезно, чтобы решить эту проблему

+0

Вам нужно показать полную трассировку стека - вы также должны исправить странное форматирование. –

+0

@GaryRussell Я обновил полную трассировку стека и отформатировал бит. Ваша помощь должна быть заметной – VelNaga

+0

@GaryRussell Хотите ли вы, чтобы я опубликовал код? – VelNaga

ответ

0

В журнале причина является NullPointerException в BookFailConsumer.execute

Так что потребитель получал сообщение и не BookConsumer, как вы, кажется, ожидать. Вам нужно проверить, почему был вызван BookFailConsumer и что там происходит. (Вы не разместили код BookFailConsumer)

+0

Спасибо за ваш комментарий. На самом деле это ошибка опечатки при отправке кода. У меня есть только один потребитель, который является «BookFailConsumer». У меня нет потребителя под названием «BookConsumer», также я упомянул, что могу получить «org.springframework.amqp.core.Message» внутри потребителя. – VelNaga