2014-09-14 2 views
3

Я хотел бы периодически запускать процесс (например, один раз в 10 минут или один раз в час), который получает все сообщения из очереди, обрабатывает их и затем выходит. Есть ли способ сделать это с помощью pika или использовать другую библиотеку python?Pika: как потреблять сообщения синхронно

+0

Как насчет запуска скрипта python с помощью cron каждые 10 минут? – bereal

+0

Да, это идея. Проблема заключается в получении всех сообщений, их обработке, а затем выходе (не работает бесконечно). –

+0

Если новые сообщения приходят во время работы скрипта, вы предпочитаете обрабатывать их на одном и том же пути или откладывать до следующего раза? – bereal

ответ

3

ли эта работа для вас:

  1. Мера текущая длина очереди в N = queue.method.message_count
  2. сделать подсчет обратного вызова обработанных сообщений и, как только N обрабатываются, называют channel.stop_consuming.

Таким образом, клиентский код будет выглядеть примерно так:

class CountCallback(object): 
    def __init__(self, count): 
     self.count = count 

    def __call__(self, ch, method, properties, body): 
     # process the message here 
     self.count -= 1 
     if not self.count: 
      ch.stop_consuming() 

channel = conn.channel() 
queue = channel.queue_declare('tasks') 
callback = CountCallback(queue.method.message_count) 
channel.basic_consume(callback, queue='tasks') 
channel.start_consuming() 
+0

Спасибо! Это похоже на то, что я хочу. Но если я правильно понимаю, я не могу одновременно запускать две копии этой программы. Увеличивает ли queue.method.message_count каждый раз, когда сообщение потребляется? Могу ли я проверить эту переменную в обратном вызове? –

+0

Я не планирую запускать две копии программы, но я бы хотел добавить поддержку на случай, если она дважды запускается дважды –

+1

@AlexanderPutilin, насколько я понимаю, 'message_count' является частью ответа AMQP [ 'queue.declare'] (http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare), и я не смог сделать это из запроса обратного вызова. [Этот ответ] (http://stackoverflow.com/a/8193456/770830) может пролить свет. – bereal

5

Я думаю, что идеальным решением здесь было бы использовать метод basic_get. Он получит одно сообщение, но если очередь уже пуста, она вернет None. Преимущество этого заключается в том, что вы можете очистить очередь с помощью простого цикла, а затем просто разбить цикл, как только возвращается None, а также безопасно запускать basic_get с несколькими потребителями.

Этот пример основан на мой собственный библиотека; amqpstorm, но вы можете легко реализовать то же самое с pika.

from amqpstorm import Connection 

connection = Connection('127.0.0.1', 'guest', 'guest') 
channel = connection.channel() 
channel.queue.declare('simple_queue') 
while True: 
    result = channel.basic.get(queue='simple_queue', no_ack=False) 
    if not result: 
     print("Channel Empty.") 
     # We are done, lets break the loop and stop the application. 
     break 
    print("Message:", result['body']) 
    channel.basic.ack(result['method']['delivery_tag']) 
channel.close() 
connection.close() 

 Смежные вопросы

  • Нет связанных вопросов^_^