2015-08-31 1 views
3

Я пытаюсь использовать Delayed Message Queue для RabbitMQ с PHP, но мои сообщения просто исчезают.Как использовать задержанную очередь сообщений RabbitMQ из PHP?

я объявляю обмен со следующим кодом:

$this->channel->exchange_declare(
    'delay', 
    'x-delayed-message', 
    false, /* passive, create if exchange doesn't exist */ 
    true, /* durable, persist through server reboots */ 
    false, /* autodelete */ 
    false, /* internal */ 
    false, /* nowait */ 
    ['x-delayed-type' => ['S', 'direct']]); 

Я связывание очереди с этим кодом:

$this->channel->queue_declare(
    $queueName, 
    false, /* Passive */ 
    true, /* Durable */ 
    false, /* Exclusive */ 
    false /* Auto Delete */ 
); 
$this->channel->queue_bind($queueName, "delay", $queueName); 

И я публикую сообщение с этим кодом:

$msg = new AMQPMessage(json_encode($msgData), [ 
    'delivery_mode' => 2, 
    'x-delay' => 5000]); 
$this->channel->basic_publish($msg, 'delay', $queueName); 

Но сообщение не задерживается; он по-прежнему сразу доставлен. Что мне не хватает?

+3

См. Ответ здесь, как установить заголовок задержки: https://groups.google.com/d/msg/rabbitmq-users/vJEG7tdzi4E/lLXF4mhoAAAJ –

ответ

3

От here,

Создание сообщения должно быть

require_once __DIR__ . '/vendor/autoload.php'; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

$msg = new AMQPMessage($data, 
      array(
       'delivery_mode' => 2, # make message persistent 
       'application_headers' => new AMQPTable([ 
        'x-delay' => 5000 
       ]) 
      ) 
     ); 
1

вам нужен ключ маршрутизации для публикации из обмена в очередь.

Причина, по которой публикация во встроенных операциях прямого обмена заключается в том, что этот обмен является особым случаем, который использует ключ маршрутизации в качестве имени очереди адресата.

для всех обменов и очередей, которые вы создаете, вам необходимо создать привязку между обменом и очередью с помощью ключа маршрутизации. то вы публикуете сообщение с этим ключом маршрутизации вместо имени очереди назначения.

я не знаю PHP код для создания привязки ... но в целом выглядит примерно так:

channel.bind(exhange_name, queue_name, routing_key)

то в вашей публикации сообщения:

$this->channel->basic_publish($msg, 'delay', $routing_key);

+0

Спасибо. Я думаю, что я приближаюсь к этой работе. –

+0

Спасибо. Сообщение доставляется через плагин, но не принимает мой параметр задержки. Я обновил свой вопрос. –

+0

Я думаю, вам может потребоваться установить «x-delay» в «properties-> headers», а не в «свойствах» напрямую. попробуйте это и посмотрите, работает ли он –

3

Ответ для тех, кто нуждается в задержке сообщения, но не хочет вникать в детали. Вам нужно всего несколько вещей, чтобы заставить его работать:

Установить amqp interop совместимый транспорт, например enqueue/amqp-bunny и enqueue/amqp-tools.

composer require enqueue/amqp-bunny enqueue/amqp-tools 

Создать AMQP контекст, добавьте стратегию задержки и отправить отсроченные сообщения:

<?php 
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy; 
use Enqueue\AmqpBunny\AmqpConnectionFactory; 

$context = (new AmqpConnectionFactory('amqp://'))->createContext(); 
$context->setDelayStrategy(new RabbitMqDelayPluginDelayStrategy()) 

$queue = $context->createQueue('foo'); 
$context->declareQueue($queue); 

$message = $context->createMessage('Hello world!'); 

$context->createProducer() 
    ->setDeliveryDelay(5000) // 5 sec 
    ->send($queue, $message) 
; 

Кстати, это не единственная стратегия отсутствует. существует один, основанный на очередях с мертвой буквой RabbitMQ + ttl. Его можно использовать одинаково.