2010-05-10 4 views
6

Я хотел бы отправить сообщение серверу RabbitMQ, а затем дождаться ответного сообщения (в очереди «ответ»). Конечно, я не хочу ждать вечно, если приложение, обрабатывающее эти сообщения, не работает - должен быть тайм-аут. Это звучит как очень простая задача, но я не могу найти способ сделать это. Я столкнулся с этой проблемой как с py-amqplib, так и с RabbitMQ .NET client.Подождите одно сообщение RabbitMQ с таймаутом

Лучшее решение я получил до сих пор является опрос с использованием basic_get с sleep в-между, но это довольно некрасиво:

def _wait_for_message_with_timeout(channel, queue_name, timeout): 
    slept = 0 
    sleep_interval = 0.1 

    while slept < timeout: 
     reply = channel.basic_get(queue_name) 
     if reply is not None: 
      return reply 

     time.sleep(sleep_interval) 
     slept += sleep_interval 

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout) 

Безусловно, есть способ лучше?

ответ

8

Я только что добавил поддержку тайм-аута для amqplib в carrot.

Это подкласс amqplib.client0_8.Connection:

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi является версией channel.wait возможность получать от произвольного числа каналов.

Я предполагаю, что это может быть объединено вверх по течению в какой-то момент.

+1

Теперь это то, что я называю «великим ответом»: «это исправлено»! Принятие - в надежде, что он * слит в amqplib. – EMP

+0

@EMP ха-ха :) смешно :) –

1

Это, похоже, нарушает всю идею асинхронной обработки, но, если вам кажется, что правильный способ сделать это - использовать RpcClient.

+0

Хотя сам RpcClient не является полезным для меня, глядя на его реализации раскрывает подход к использованию: создать 'QueueingBasicConsumer' и подождите в очереди, которая поддерживает тайм-аут. Это не так сложно в .NET, как я боялся. – EMP

2

Приводится пример here с использованием qpid с msg = q.get(timeout=1), который должен делать то, что вы хотите. К сожалению, я не знаю, какие другие клиентские библиотеки AMQP используют тайм-ауты (и, в частности, я не знаю двух указанных вами).

+0

Глядя на источник qpid, он использует тот же подход, что и клиент .NET: 'basic_consume' с очередью и ожидания очереди с таймаутом. Похоже, это то, что я должен буду сделать. – EMP

8

Вот что я в конечном итоге делает в клиенте .NET:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs) 
{ 
    var consumer = new QueueingBasicConsumer(Channel); 
    var tag = Channel.BasicConsume(queueName, true, null, consumer); 
    try 
    { 
     object result; 
     if (!consumer.Queue.Dequeue(timeoutMs, out result)) 
      throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs/1000.0)); 

     return ((BasicDeliverEventArgs)result).Body; 
    } 
    finally 
    { 
     Channel.BasicCancel(tag); 
    } 
} 

К сожалению, я не могу сделать то же самое с PY-amqplib, потому что его метод basic_consume не вызывает функцию обратного вызова, если вы звоните channel.wait() и channel.wait() не поддерживает тайм-ауты! Это глупое ограничение (которое я продолжаю работать) означает, что если вы никогда не получите другое сообщение, ваша нить будет заморожена навсегда.

1

Rabbit теперь позволяет добавлять события таймаута. Просто обернуть свой код в попытке поймать, а затем бросить исключения в TimeOut и Отсоединение обработчики:

try{ 
    using (IModel channel = rabbitConnection.connection.CreateModel()) 
    { 
     client = new SimpleRpcClient(channel, "", "", queue); 
     client.TimeoutMilliseconds = 5000; // 5 sec. defaults to infinity 
     client.TimedOut += RpcTimedOutHandler; 
     client.Disconnected += RpcDisconnectedHandler; 
     byte[] replyMessageBytes = client.Call(message); 
     return replyMessageBytes; 
    } 
} 
catch (Exception){ 
    //Handle timeout and disconnect here 
} 
private void RpcDisconnectedHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC disconnect exception occured."); 
} 

private void RpcTimedOutHandler(object sender, EventArgs e) 
{ 
    throw new Exception("RPC timeout exception occured."); 
}