2016-05-06 8 views
0

У меня есть несколько руководств, работающих с Spring Boot и RPC через RabbitMQ. Однако, как только я пытаюсь добавить конвертер сообщений Jackson JSON, все это разваливается.Весенняя загрузка и весна AMQP RPC - конвертер не найден для преобразования исключений

Удаленный вызов успешно принят сервером, поэтому я уверен, что это не клиентская конфигурация.

Exchange DATAFLOW_EXCHANGE 
Routing Key  dataflowRunner 
Redelivered  ○ 
Properties 
reply_to: amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw== 
priority: 0 
delivery_mode: 2 
headers:  
__TypeId__: org.springframework.remoting.support.RemoteInvocation 
content_encoding: UTF-8 
content_type: application/json 
Payload 
675 bytes 
Encoding: string 


{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]} 

Однако следующее исключение выводится:

Caused by: org.springframework.messaging.converter.MessageConversionException: 
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage 
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}] 
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118) 
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98) 
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) 
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) 
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) 
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112) 
... 12 common frames omitted 

Так, на доставку, он знает, что он должен быть объектом dw.dataflow.Dataflow, он просто не может найти конвертер. Тем не менее, у меня есть мой конвертер, определяемый ВЕЗДЕ.

Конфигурация сервера

@Configuration 
@EnableRabbit 
public class RabbitListenerConfiguration { 
    @Autowired 
    ConnectionFactory connectionFactory; 
    @Autowired 
    ObjectMapper  jacksonObjectMapper; 

@Bean 
public TopicExchange exchange() { 
    return new TopicExchange("DATAFLOW_EXCHANGE", true, false); 
} 

@Bean 
public Queue queue() { 
    return new Queue("DATAFLOW_QUEUE", true); 
} 

@Bean 
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() { 
    AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ; 
    exporter.setAmqpTemplate(rabbitTemplate()); 
    exporter.setMessageConverter(jackson2JsonMessageConverter()); 
    exporter.setServiceInterface(DataflowRunner.class); 
    exporter.setService(dataflowRunner()); 
    return exporter ; 
} 

@Bean 
public DataflowRunner dataflowRunner() { 
    return new DataflowRunnerServerImpl(); 
} 

@Bean 
public MessageConverter jackson2JsonMessageConverter() { 
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); 
    converter.setJsonObjectMapper(jacksonObjectMapper); 
    return converter; 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory); 
    template.setMessageConverter(jackson2JsonMessageConverter()); 
    return template; 
} 


@Bean(name="rabbitListenerContainerFactory") 
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { 
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory); 
    factory.setMessageConverter(jackson2JsonMessageConverter()); 
    factory.setDefaultRequeueRejected(false); 
    return factory; 
} 

Вот интерфейс Сервис:

public interface DataflowRunner { 
    String run(Dataflow dataflow) throws Exception; 
} 

И конкретная реализация:

public class DataflowRunnerServerImpl implements DataflowRunner { 
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE") 
public String run(Dataflow dataflow) throws Exception { 
    // SNIP 
} 

Для улыбок и смешков, я также пытался настроить реализацию сервера класс со следующими аннотациями, но имеет ту же ошибку:

@RabbitHandler 
@RabbitListener(
     bindings = @QueueBinding(key = "dataflowRunner", 
       value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"), 
       exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic"))) 
public String run(Dataflow dataflow) throws Exception { 

Конфигурация клиента

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort); 
    connectionFactory.setUsername(rabbitUser); 
    connectionFactory.setPassword(rabbitPassword); 
    connectionFactory.setAddresses(rabbitAddresses); 
    return connectionFactory; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    template.setMessageConverter(jackson2MessageConverter()); 
    return template; 
} 

ли что-то, кажется, неправильно настроен? Что мне не хватает? У меня есть конвертер, установленный на экспортере услуг, и фабрика контейнеров-слушателей.

Любая помощь и/или мысли оценены.

+0

Пожалуйста, поделитесь этим учебники. Нет необходимости предоставлять какие-либо 'impl'. «AmqpProxyFactoryBean» делает все для вас. Вы немного смешали проблемы: http://docs.spring.io/spring-amqp/reference/html/_reference.html#remoting –

ответ

4

@RabbitListener не предназначен для использования с экспортером услуг - просто простой Java-класс.

Для пружины Сверху по RPC, экспортером услуг является MessageListener для SimpleMessageListenerContainer.

С @RabbitListener есть специальный адаптер-слушатель, который обертывает метод pojo.

Итак, вы, кажется, смешиваете две разные парадигмы.

Ожидается, что ServiceExporter (весенний удаленный доступ) будет соединен с AmqpProxyFactoryBean на стороне клиента с экспортером услуг в качестве слушателя на стороне сервера.

Для простого POJO RPC (который намного новее, чем использование Spring Remoting over RabbitMQ), используйте на стороне клиента @RabbitListener и RabbitTemplate.convertSendAndReceive(). Избавьтесь от PFB и SE.

Можете ли вы объяснить, что привело вас по этому пути, если нам нужно добавить некоторые пояснения к документации.

EDIT

Если вы сделать хотите использовать Spring Remoting (инъекционные интерфейс на стороне клиента, и он «волшебным образом» вызвать службу на стороне сервера), вам нужно избавиться от все вещи фабрики контейнера и просто подключите SimpleMessageListenerContainer и заказывайте экспортера услуг как MessageListener.

Справочное руководство содержит an XML example, но вы можете подключить SMLC как @Bean.

EDIT2

Я запустить некоторые тесты и Spring Remoting над AMQP не работает с JSON, так как объект верхнего уровня является RemoteInvocation - в то время как преобразователь сообщения может повторно создать этот объект, он не имеет введите информацию о фактических аргументах, поэтому она останется связанной с хэш-картой.

На данный момент, если вы должны использовать JSON, шаблон convertSendAndReceive в сочетании с @RabbitListener - это путь сюда. Я открою JIRA-проблему, чтобы узнать, можем ли мы использовать использование Spring Remoting RPC с JSON, но он был действительно разработан для Java Serialization.

+0

Только то, что я искал. Я столкнулся с этой же проблемой, и у меня есть час, читающий Spring AMQP и Spring Remoting и код, и я достиг такого же вывода, что JSON не поддерживается, потому что он не преобразует фактическое значение, а только объект-оболочку. К сожалению, я надеялся разоблачить существующий сервис, который создает JSON, используя эту функцию. Мои текущие DTO не сериализуемы, поэтому я не мог просто использовать обычный конвертер. –

0

Я потратил несколько минут на это, и мне удалось решить проблему с ужасным взломом, который, кажется, работает.

Я в основном расширил классы, участвующие в вызове с обеих сторон, чтобы убедиться, что внутренние аргументы и значение преобразуются в/из строк JSON.

С немного более любовью это может улучшить работу с другими типами данных с использованием других преобразователей, но у меня не было времени на это. Я оставляю вам, если достаточно храбр, чтобы дать ему попробовать :-)

На стороне сервера

Во-первых, я подклассы в AmqpInvokerServiceExporter, чтобы иметь возможность добавить поддержку для преобразования в/из объектов JSON , Первым шагом является преобразование аргументов метода из JSON в соответствующие типы. Второй шаг - преобразовать возвращаемое значение из объекта в соответствующую строку JSON, чтобы отправить его обратно.

public class JSONAmqpInvokerServiceExporter extends AmqpInvokerServiceExporter { 

    private final ObjectMapper objectMapper = new ObjectMapper(); 

    @Override 
    public void onMessage(Message message) { 
     Address replyToAddress = message.getMessageProperties().getReplyToAddress(); 
     if (replyToAddress == null) { 
      throw new AmqpRejectAndDontRequeueException("No replyToAddress in inbound AMQP Message"); 
     } 

     Object invocationRaw = getMessageConverter().fromMessage(message); 

     RemoteInvocationResult remoteInvocationResult; 
     if (invocationRaw == null || !(invocationRaw instanceof RemoteInvocation)) { 
      remoteInvocationResult = new RemoteInvocationResult(
       new IllegalArgumentException("The message does not contain a RemoteInvocation payload")); 
     } 
     else { 
      RemoteInvocation invocation = (RemoteInvocation) invocationRaw; 
      int argCount = invocation.getArguments().length; 
      if (argCount > 0) { 
       Object[] arguments = invocation.getArguments(); 
       Class<?>[] parameterTypes = invocation.getParameterTypes(); 
       for (int i = 0; i < argCount; i++) { 
        try { 
         //convert arguments from JSON strings to objects 
         arguments[i] = objectMapper.readValue(arguments[i].toString(), parameterTypes[i]); 
        } 
        catch (IOException cause) { 
         throw new MessageConversionException(
          "Failed to convert JSON to value: " + arguments[i] + " of type" + parameterTypes[i], cause); 
        } 
       } 
      } 

      remoteInvocationResult = invokeAndCreateResult(invocation, getService()); 
     } 
     send(remoteInvocationResult, replyToAddress); 
    } 

    private void send(RemoteInvocationResult result, Address replyToAddress) { 
     Object value = result.getValue(); 
     if (value != null) { 
      try { 
       //convert the returning value from a model to a JSON string 
       //before we send it back 
       Object json = objectMapper.writeValueAsString(value); 
       result.setValue(json); 
      } 
      catch (JsonProcessingException cause) { 
       throw new MessageConversionException("Failed to convert value to JSON: " + value, cause); 
      } 
     } 
     Message message = getMessageConverter().toMessage(result, new MessageProperties()); 

     getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), message); 
    } 

} 

Теперь, когда определен этот класс, я изменил определение моего слушателя службы чем-то вроде этого:

<bean id="toteServiceListener" class="amqphack.FFDAmqpInvokerServiceExporter"> 
    <property name="serviceInterface" value="ampqphack.ToteService"/> 
    <property name="service" ref="defaultToteService"/> 
    <property name="amqpTemplate" ref="rabbitTemplate"/> 
</bean> 

<rabbit:listener-container connection-factory="connectionFactory"> 
    <rabbit:listener ref="toteServiceListener" queue-names="tote-service"/> 
</rabbit:listener-container> 

Я использовал регулярные AmqTemplate в этом случае, так как я знаю, что ResultInvocationValue будет всегда преобразованный в строку JSON в любом случае, поэтому я не против, если InvocationResult сериализуется с использованием традиционной сериализации Java.

На стороне клиента

В клиенте я должен был изменить к вещам. Во-первых, мне нужно, чтобы любые аргументы, которые мы отправляем в вызове, были преобразованы в строки JSON до того, как мы это сделаем, но мы по-прежнему сохраняем их типы параметров. К счастью, существующий AmqpProxyFactoryBean принимает параметр remoteInvocationFactory, где мы можем перехватить вызов и изменить его.Так я впервые определил и новый RemoteInvocationFactory:

public class JSONRemoteInvocationFactory implements RemoteInvocationFactory { 

    private final ObjectMapper mapper = new ObjectMapper(); 

    @Override 
    public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) { 
     RemoteInvocation invocation = new RemoteInvocation(methodInvocation); 
     if (invocation.getParameterTypes() != null) { 
      int paramCount = invocation.getParameterTypes().length; 
      Object[] arguments = new Object[paramCount]; 
      try { 
       for (int i = 0; i < paramCount; i++) { 
        arguments[i] = mapper.writeValueAsString(invocation.getArguments()[i]); 
       } 
       invocation.setArguments(arguments); 
      } 
      catch (JsonProcessingException cause) { 
       throw new RuntimeException(
        "Failed converting arguments to json: " + Arrays.toString(invocation.getArguments()), cause); 
      } 
     } 
     return invocation; 
    } 
} 

Но этого недостаточно. Когда мы вернем результат, нам нужно снова вернуть его результат в объект Java. Для этого мы можем использовать ожидаемый тип возвращаемого сервиса. И для этого я расширил существующие AmqpProxyFactoryBean, чтобы просто преобразовать его результат, который, как я знаю, всегда будет String, в Java-модель.

public class JSONAmqpProxyFactoryBean extends AmqpProxyFactoryBean { 

    private final ObjectMapper mapper = DefaultObjectMapper.createDefaultObjectMapper(); 

    @Override 
    public Object invoke(MethodInvocation invocation) throws Throwable { 
     Object ret = super.invoke(invocation); 
     return mapper.readValue(ret.toString(), invocation.getMethod().getReturnType()); 
    } 

} 

И с этим, я был в состоянии определить свою клиентскую сторону примерно так:

<bean id="toteService" class="amqphack.JSONAmqpProxyFactoryBean"> 
    <property name="amqpTemplate" ref="rabbitTemplate"/> 
    <property name="serviceInterface" value="amqphack.ToteService"/> 
    <property name="routingKey" value="tote-service"/> 
    <property name="remoteInvocationFactory" ref="remoteInvocationFactory"/> 
</bean> 

И после этого все работало как шарм:

ToteService toteService = context.getBean("toteService", ToteService.class); 
ToteModel tote = toteService.findTote("18251", "ABCD"); 

Поскольку я не изменяю традиционный конвертер, это означает, что исключения по-прежнему правильно сериализованы в InvocationResult.

0

Не знаю, нужна ли его еще, но так я решил проблему с использованием JSON с AmqpProxyFactoryBean/AmqpInvokerServiceExporter. На стороне клиента я использую конвертер Jackson2JsonMessageConverter и на стороне сервера RemoteInvocationAwareMessageConverterAdapter, который обертывает конвертер Jackson2JsonMessageConverter.

ClientConfig.java:

import com.stayfriends.commons.services.interfaces.GameService; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.beans.factory.FactoryBean; 
import org.springframework.beans.factory.InitializingBean; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class ClientConfig { 

    @Bean 
    public RabbitTemplate gameServiceTemplate(ConnectionFactory connectionFactory, 
               Jackson2JsonMessageConverter messageConverter) { 
     RabbitTemplate template = new RabbitTemplate(connectionFactory); 
     template.setExchange("rpc"); 
     template.setMessageConverter(messageConverter); 
     return template; 
    } 

    @Bean 
    public ServiceAmqpProxyFactoryBean gameServiceProxy2(@Qualifier("gameServiceTemplate") RabbitTemplate template) { 
     return new ServiceAmqpProxyFactoryBean(template); 
    } 


    public static class ServiceAmqpProxyFactoryBean implements FactoryBean<Service>, InitializingBean { 
     private final AmqpProxyFactoryBean proxy; 

     ServiceAmqpProxyFactoryBean(RabbitTemplate template) { 
      proxy = new AmqpProxyFactoryBean(); 
      proxy.setAmqpTemplate(template); 
      proxy.setServiceInterface(GameService.class); 
      proxy.setRoutingKey(GameService.class.getSimpleName()); 
     } 

     @Override 
     public void afterPropertiesSet() { 
      proxy.afterPropertiesSet(); 
     } 

     @Override 
     public Service getObject() throws Exception { 
      return (Service) proxy.getObject(); 
     } 

     @Override 
     public Class<?> getObjectType() { 
      return Service.class; 
     } 

     @Override 
     public boolean isSingleton() { 
      return proxy.isSingleton(); 
     } 
    } 

} 

ServerConfig.java

import org.springframework.amqp.core.*; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer; 
import org.springframework.amqp.rabbit.listener.MessageListenerContainer; 
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter; 
import org.springframework.amqp.support.converter.RemoteInvocationAwareMessageConverterAdapter; 
import org.springframework.beans.factory.annotation.Qualifier; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 

@Configuration 
public class ServerConfig { 

    @Bean 
    public DirectExchange serviceExchange() { 
     return new DirectExchange("rpc"); 
    } 

    @Bean 
    public Queue serviceQueue() { 
     return new Queue(Service.class.getSimpleName()); 
    } 

    @Bean 
    public Binding binding(@Qualifier("serviceQueue") Queue queue, @Qualifier("serviceExchange") Exchange exchange) { 
     return BindingBuilder.bind(queue).to(exchange).with(Service.class.getSimpleName()).noargs(); 
    } 

    @Bean("remoteInvocationAwareMessageConverter") 
    @Primary 
    public RemoteInvocationAwareMessageConverterAdapter remoteInvocationAwareMessageConverterAdapter(
     Jackson2JsonMessageConverter jsonMessageConverter) { 
     return new RemoteInvocationAwareMessageConverterAdapter(jsonMessageConverter); 
    } 

    @Bean 
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, ServiceImpl service, 
               RemoteInvocationAwareMessageConverterAdapter messageConverter) { 
     AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter(); 
     exporter.setAmqpTemplate(template); 
     exporter.setService(service); 
     exporter.setServiceInterface(Service.class); 
     exporter.setMessageConverter(messageConverter); 
     return exporter; 
    } 

    @Bean 
    public MessageListenerContainer container(ConnectionFactory connectionFactory, 
               @Qualifier("serviceQueue") Queue queue, 
               AmqpInvokerServiceExporter exporter) { 
     DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory); 
     container.setQueues(queue); 
     container.setMessageListener(exporter); 
     container.setConsumersPerQueue(5); 
     return container; 
    } 
}