2015-02-11 3 views
1

У меня есть два потока. Один из них - Worker Thread, другой - Communication Thread.ZMQ пара (для сигнализации) блокируется из-за плохого соединения

Worker Thread считывает данные с последовательного порта, выполняет некоторую обработку и затем отображает результаты, отправляемые на сервер.

Communication Tthread читает результаты с очереди и отправляет их. Проблема заключается в том, что соединение является беспроводным, и хотя обычно оно присутствует, оно может быть пятнистым (падение и выход за пределы диапазона в течение нескольких минут), и я не хочу блокировать Worker Thread, если я потеряю связь.

Шаблон я выбрал для этого, заключается в следующем:

Worker Thread имеет enqueue метод, который добавляет сообщение к Queue, а затем послать сигнал inproc://signal с использованием zmq.PAIR.

Communication Thread использует zmq.DEALER для связи с сервером (zmq.ROUTER), но опрашивает inproc://signal пару для того, чтобы зарегистрировать есть ли новое сообщение необходимости передачи или нет.

Ниже приведен упрощенный пример шаблона:

import Queue 
import zmq 
import time 
import threading 
import simplejson 


class ZmqPattern(): 
    def __init__(self): 
     self.q_out = Queue.Queue() 
     self.q_in = Queue.Queue() 
     self.signal = None 
     self.API_KEY = 'SOMETHINGCOMPLEX' 
     self.zmq_comm_thr = None 

    def start_zmq_signal(self): 
     self.context = zmq.Context() 

     # signal socket for waking the zmq thread to send messages to the relay 
     self.signal = self.context.socket(zmq.PAIR) 
     self.signal.bind("inproc://signal") 

    def enqueue(self, msg): 
     print("> pre-enqueue") 
     self.q_out.put(msg) 
     print("< post-enqueue") 

     print(") send sig") 
     self.signal.send(b"") 
     print("(sig sent") 

    def communication_thread(self, q_out): 
     poll = zmq.Poller() 

     self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001' 

     wake = self.context.socket(zmq.PAIR) 
     wake.connect("inproc://signal") 
     poll.register(wake, zmq.POLLIN) 

     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.setsockopt(zmq.IDENTITY, self.API_KEY) 
     self.socket.connect(self.endpoint_url) 
     poll.register(self.socket, zmq.POLLIN) 

     while True: 
      sockets = dict(poll.poll()) 

      if self.socket in sockets: 
       message = self.socket.recv() 
       message = simplejson.loads(message) 

       # Incomming messages which need to be handled on the worker thread 
       self.q_in.put(message) 

      if wake in sockets: 
       wake.recv() 
       while not q_out.empty(): 
        print(">> Popping off Queue") 
        message = q_out.get() 
        print(">>> Popped off Queue") 
        message = simplejson.dumps(message) 
        print("<<< About to be sent") 
        self.socket.send(message) 
        print("<< Sent") 

    def start(self): 
     self.start_zmq_signal() 
     # ZMQ Thread 
     self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out])) 
     self.zmq_comm_thr.daemon = True 
     self.zmq_comm_thr.name = "ZMQ Thread" 
     self.zmq_comm_thr.start() 


if __name__ == '__main__': 
    test = ZmqPattern() 
    test.start() 

    print '###############################################' 
    print '############## Starting comms #################' 
    print "###############################################" 

    last_debug = time.time() 
    test_msg = {} 
    for c in xrange(1000): 
     key = 'something{}'.format(c) 
     val = 'important{}'.format(c) 
     test_msg[key] = val 

    while True: 
     test.enqueue(test_msg) 
     if time.time() - last_debug > 1: 
      last_debug = time.time() 
      print "Still alive..." 

Если запустить это, вы будете видеть дилерские блоки, как нет маршрутизатора на другом конце, и вскоре после того, как пара блоков поскольку Communication Thread не принимает

Как лучше всего настроить inproc zmq, чтобы не блокировать Worker Thread.

FYI, наиболее полная система должна буферизировать порядка 200 тыс. Сообщений, а каждое сообщение составляет около 256 байт.

+0

Так вы используете InProc ZMQ для арбитража доступа к питона 'Queue'. Почему бы не использовать ZMQ в очереди? PUSH-PULL приходит на ум. – engineerC

+0

Не будет ли это так же блокироваться? – Jono

+0

Да, это был скорее общий комментарий. Ответьте ниже, почему вы блокируете. – engineerC

ответ

1

Дилерский сокет имеет ограничение на количество сообщений, которое он будет хранить, называемый знаком высокой воды. Прямо под созданием дилерской розетки попробуйте:

self.socket = self.context.socket(zmq.DEALER) 
    self.socket.setsockopt(zmq.SNDHWM, 200000) 

И установите это число так высоко, как вы посмеете; предел - это память вашей машины.

EDIT:

Некоторые хорошие обсуждение высокие оценки воды в этом вопросе:

Majordomo broker: handling large number of connections

+0

Спасибо за предложение. Я хотел задать вопрос, прежде чем принимать меры, поскольку я никогда не полностью уверен в последствиях. Если я установил HWM в гнездо Дилера, он все равно будет получать все сигналы inproc от гнезда пары, и все будет продолжаться так, как ожидается, правильно? Считаете ли вы, что лучшим решением является его буферизация в буфере отправителя/ядра, а не использование семафора между рабочим потоком и коммуникационным потоком - буферизация сообщений в очередь, но без отправки сигнала inproc? – Jono

+0

Предполагая, что вы никогда не хотите удалять сообщения, и если ваш потребитель может уйти в любой момент, вы должны либо 1) иметь неограниченную очередь, в которой их хранить, либо 2) приостановить процесс, создающий их, когда вы достигнете предела вашей очереди. Одно неясно: должен ли работник работать по какой-то другой причине, кроме как для создания этих сообщений; то есть выполняет ли она какую-то необходимую функцию, которая должна выполняться независимо от того, слушает ли кто-нибудь? – engineerC

+0

Что касается сигнала, я чувствую, что это совершенно не нужно, потому что если вы в очереди, вы всегда можете сделать блокировку ожидания в самой очереди, которая будет разблокирована, как только новые элементы будут добавлены в очередь. – engineerC