Я хотел бы периодически запускать процесс (например, один раз в 10 минут или один раз в час), который получает все сообщения из очереди, обрабатывает их и затем выходит. Есть ли способ сделать это с помощью pika
или использовать другую библиотеку python?Pika: как потреблять сообщения синхронно
ответ
ли эта работа для вас:
- Мера текущая длина очереди в
N = queue.method.message_count
- сделать подсчет обратного вызова обработанных сообщений и, как только
N
обрабатываются, называютchannel.stop_consuming
.
Таким образом, клиентский код будет выглядеть примерно так:
class CountCallback(object):
def __init__(self, count):
self.count = count
def __call__(self, ch, method, properties, body):
# process the message here
self.count -= 1
if not self.count:
ch.stop_consuming()
channel = conn.channel()
queue = channel.queue_declare('tasks')
callback = CountCallback(queue.method.message_count)
channel.basic_consume(callback, queue='tasks')
channel.start_consuming()
Спасибо! Это похоже на то, что я хочу. Но если я правильно понимаю, я не могу одновременно запускать две копии этой программы. Увеличивает ли queue.method.message_count каждый раз, когда сообщение потребляется? Могу ли я проверить эту переменную в обратном вызове? –
Я не планирую запускать две копии программы, но я бы хотел добавить поддержку на случай, если она дважды запускается дважды –
@AlexanderPutilin, насколько я понимаю, 'message_count' является частью ответа AMQP [ 'queue.declare'] (http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.declare), и я не смог сделать это из запроса обратного вызова. [Этот ответ] (http://stackoverflow.com/a/8193456/770830) может пролить свет. – bereal
Я думаю, что идеальным решением здесь было бы использовать метод basic_get. Он получит одно сообщение, но если очередь уже пуста, она вернет None
. Преимущество этого заключается в том, что вы можете очистить очередь с помощью простого цикла, а затем просто разбить цикл, как только возвращается None
, а также безопасно запускать basic_get с несколькими потребителями.
Этот пример основан на мой собственный библиотека; amqpstorm, но вы можете легко реализовать то же самое с pika.
from amqpstorm import Connection
connection = Connection('127.0.0.1', 'guest', 'guest')
channel = connection.channel()
channel.queue.declare('simple_queue')
while True:
result = channel.basic.get(queue='simple_queue', no_ack=False)
if not result:
print("Channel Empty.")
# We are done, lets break the loop and stop the application.
break
print("Message:", result['body'])
channel.basic.ack(result['method']['delivery_tag'])
channel.close()
connection.close()
Как насчет запуска скрипта python с помощью cron каждые 10 минут? – bereal
Да, это идея. Проблема заключается в получении всех сообщений, их обработке, а затем выходе (не работает бесконечно). –
Если новые сообщения приходят во время работы скрипта, вы предпочитаете обрабатывать их на одном и том же пути или откладывать до следующего раза? – bereal