2013-09-16 2 views
2

У меня есть Продюсер, отправляющий сообщения на обмен темы. Каждое сообщение содержит ключ маршрутизации. (Извиняюсь за примитивной схеме)Потребляйте несколько очередей по приоритету

 P 
     | 
     X 
    /| |\ 
    /| | \ 
/| | \ 
    Q1 Q2 Q3 Q4 
    | ///
    |///
    |///
    |/// 
     C 

Я использую php-amqplib и пытаю потреблять количество очередей. То, что я пытаюсь достичь, состоит в том, чтобы последовательно проверять каждую очередь, видеть, есть ли у нее сообщение и, если да, обрабатывать его, в противном случае перейти к следующей очереди. Кроме того, если сообщение найдено, снова запустите процесс проверки с Q1. Следующий код не работает, но продемонстрирует логику того, что я хочу делать.

$connection = new AMQPConnection(HOST, PORT, USER, PASS, VHOST); 
$channel = $connection->channel(); 

$channel->exchange_declare('myexchange', 'topic', false, false, false); 

$channel->queue_declare("Q1", false, true, false, false); 
$channel->queue_bind("Q1", 'myexchange', 'priority.1'); 

$channel->queue_declare("Q2", false, true, false, false); 
$channel->queue_bind("Q2", 'myexchange', 'priority.2'); 

$channel->queue_declare("Q3", false, true, false, false); 
$channel->queue_bind("Q3", 'myexchange', 'priority.3'); 

$channel->queue_declare("DFQ4", false, true, false, false); 
$channel->queue_bind("DFQ4", 'myexchange', 'priority.4'); 

$queues = array('Q1','Q2','Q3','Q4'); 

$priority = 0; 
while (1) { 

    $priority = ($priority<4)? $priority+1 : 0; 

    $msg = $channel->basic_consume($queues[$priority], $consumer_tag, false, false, false, false); 
    if(isset($msg->body)) { 
     echo ' [x] ',$msg->delivery_info['routing_key'], "\n"; 
     $channel->basic_ack($msg->delivery_info['delivery_tag']); 
     $priority = 0; 
    } 
} 

$channel->close(); 
$connection->close(); 
+0

Я не понимаю, почему вы хотите проверить очереди снова, вместо обработки сообщения, когда вы его получите. И я не понимаю, почему вам нужен приоритет, когда вы можете просто объявить любое количество желаемых потребителей и обрабатывать все параллельно. – Kethryweryn

+0

Сообщение в Q1 должно быть обработано до одного в Q2, до одного в Q3 и т. Д. Скажем, в какой-то момент были сообщения только в четвертом квартале. Один из них был бы обработан, затем была сделана проверка, чтобы увидеть, появились ли какие-либо сообщения в Q1, Q2 и Q3, прежде чем пытаться обработать следующее сообщение в Q4. Работы по Q1 должны быть выполнены в первую очередь, отсюда и необходимость приоритета. Дуг Барт описал возможное решение [http://dougbarth.github.io/2011/07/01/approximating-priority-with-rabbitmq.html] – agnitio

+0

Вы пробовали свой код с помощью [basic_get] (https: // github. com/videlalvaro/php-amqplib/blob/master/demo/basic_get.php) вместо basic_consume? – Kethryweryn

ответ

0

У меня было такое же требование для моих проектов .NET. В принципе, у меня был подобный подход, но я использовал обмен заголовками для подачи опубликованных сообщений в эти очереди. Созданная мной библиотека подписалась на все эти очереди (с одинаковым значением размера предварительной выборки) и поместила все эти сообщения во внутреннюю очередь памяти, где эти сообщения могут быть отсортированы по их приоритетам. Поэтому в конце конвейера фактический код для обработки сообщений будет считывать их из внутренней очереди, сообщения с более высоким приоритетом будут обработаны, а затем активированы до более низкого приоритета.