2016-02-05 2 views
2

Я изо всех сил стараюсь найти способ для запланированных/отложенных сообщений весной AMQP/Rabbit MQ.
После долгих поисков я не могу этого сделать весной AMQP. Может кто-нибудь, пожалуйста, скажите мне, как сделать x-delay весной AMQP.
Я хочу задержать сообщение, если какое-то исключение возникает в потребительской стороне. RabbitMQ говорит, чтобы добавить й задержку и установить плагин, который я уже сделал, но все сообщения сразу пришедшими без каких-либо задержекЗапланированная/Задержка обмена сообщениями весной AMQP RabbitMq



Я получаю это сообщение
Received < (Body: '[B @ 60a4ae5f (байт [26]) 'MessageProperties [заголовки = {х задержка = 15000}

@Bean 
ConnectionFactory connectionFactory(){ 

    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setPort(1500); 
    connectionFactory.setPublisherReturns(true); 
    return connectionFactory; 

} 

@Bean 
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) { 
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null); 
    //return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
DirectExchange exchange() { 
    DirectExchange exchange=new DirectExchange("delay-exchange"); 
    return exchange; 
} 

Потребитель ---
@Override

public void onMessage(Message message, Channel channel) throws Exception { 

    System.out.println("Received <" + message+ ">" +rabbitTemplate); 

    if(i==1){ 
     AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); 
     Map<String,Object> headers = message.getMessageProperties().getHeaders(); 
     headers.put("x-delay", 15000); 
     props.headers(headers); 
     i++; 
     channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), 
       props.build(), message.getBody()); 
    } 
    } 
+1

Не уверен, кто рекомендовал вам пометить этот вопрос тегом 'jms', но это неверно. Это касается только RabbitMQ. И да, 'spring-amqp'. Фиксация ... –

ответ

4

Прежде всего выглядит, как вы не следуешь с Scheduling Messages with RabbitMQ статьи:

Для использование отложенных сообщений Exchange вам просто нужно объявить обмен, обеспечивающий «х-отсроченное сообщение» Тип обмен следующим образом:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-delayed-type", "direct"); 
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); 

Я бы сказал, что то же самое может быть достигнуто с помощью Spring AMQP:

@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

Другая проблема, что йо u действительно должен публиковать сообщения к этому delay-exchange, а не к другим. Опять же: это упоминается в этом документе в любом случае.

ОБНОВЛЕНИЕ

С весны AMQP 1.6 запаздывающего сообщения поддерживается, как вне коробки особенность: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available.

+0

Да, я не получал этот тип сообщения с задержкой в ​​Exchange, как это было не в Spring AMQP. Я прошел через это много раз, но не смог понять, что это тип Exchange. В любом случае, я могу сделать это сейчас. –

+0

Спасибо за сообщение в Spring AMQP, а также ... –

+0

@Artem Где я должен искать spring-rabbit-1.6.xsd –

 Смежные вопросы

  • Нет связанных вопросов^_^