У меня проблема. Мой клиент pika постоянно выходит из строя с сообщением об ошибке.Pika сбой без видимой причины
Это то, что происходит:
- RabbitMQ работает, а производитель уже толкнул сообщения в очередь
- я начинаю мой питон скрипт и обрабатывает все пакеты, буферизованные в очереди
- Мой сценарий периодически бросает Exeption: ConnectionClosed, однако я никогда не закрывать что-нибудь где-нибудь
Это мой код:
import pika
import traceback
class RPCServer(object):
def __init__(self, callback, cfg):
self.cfg = cfg
self.callback = callback
self.credentials = None
self.parameters = None
self.connection = None
self.channel = None
self.counter = 0
self.initalize_me()
def initalize_me(self):
self.credentials = pika.PlainCredentials(self.cfg.USER, self.cfg.PASSWORD)
self.parameters = pika.ConnectionParameters(host=self.cfg.AMQP_HOST, credentials=self.credentials)
self.connection = pika.BlockingConnection(self.parameters)
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange=self.cfg.RPC_EXCHANGE_NAME, type="direct")
self.channel.queue_declare(queue=self.cfg.RPC_QUEUE_NAME)
self.channel.queue_bind(exchange=self.cfg.RPC_EXCHANGE_NAME, queue=self.cfg.RPC_QUEUE_NAME, routing_key=self.cfg.RPC_ROUTING_KEY)
self.channel.basic_consume(self.rpc_callback, queue=self.cfg.RPC_QUEUE_NAME)
print "init= " + str(self.cfg.RPC_EXCHANGE_NAME) + " -> " + str(self.cfg.RPC_QUEUE_NAME) + " -> " + str(self.cfg.RPC_ROUTING_KEY)
def start_rpc_server(self):
print "Server: Start listening for RPC requests..."
try:
self.channel.start_consuming()
except:
print "Exception: " + str(traceback.format_exc())
self.initalize_me()
self.start_rpc_server()
def rpc_callback(self, ch, method, props, body):
self.counter += 1
if self.counter == 100:
print "100 package processed..."
self.counter = 0
result = self.callback(body)
properties = pika.BasicProperties(correlation_id=props.correlation_id)
ch.basic_publish(exchange="", routing_key=props.reply_to, properties=properties, body=result)
ch.basic_ack(delivery_tag=method.delivery_tag)
И это выход, когда я запускаю его:
python RunPacer.py
Initialize Configuration
Start Pacer
100 package processed...
100 package processed...
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed
100 package processed...
100 package processed...
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug
Server: Start listening for RPC requests...
100 package processed...
100 package processed...
Exception: Traceback (most recent call last):
File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server
self.channel.start_consuming()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events
raise exceptions.ConnectionClosed()
ConnectionClosed
К сожалению, мое описание очень unprecise, однако, это благодаря тому, что я абсолютно не знаю, почему мой сценарий аварий. Поэтому любой, действительно любой, совет будет полезен. Благодаря!
EDIT: Журналы ошибок из RabbitMQ добавил:
=INFO REPORT==== 4-Dec-2014::12:55:42 ===
accepting AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672)
=ERROR REPORT==== 4-Dec-2014::12:55:42 ===
Error on AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672, vhost: '/', user: 'username', state: running), channel 1:
{amqp_error,unexpected_frame,
"expected content body, got non content body frame instead",
'basic.publish'}
=INFO REPORT==== 4-Dec-2014::12:55:43 ===
closing AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672)
Дополнительно: У меня есть программа Java, работающих на одной и той же очереди (на самом деле копию сценария питона в Java), который работает без каких-либо проблем.
Является ли какое-либо указание проблемы в журналах RabbitMQ? – HAL
Ваша программа Java изящно пытается подключиться к ошибкам соединения? Любые другие журналы (dmesg/rabbitmq/etc) имеют события во время отключения? –
Нет, моя программа Java не пытается повторно подключиться. Я не обнаружил никаких других проблем в файлах журналов. – toom