У меня есть два потока. Один из них - Worker Thread
, другой - Communication Thread
.ZMQ пара (для сигнализации) блокируется из-за плохого соединения
Worker Thread
считывает данные с последовательного порта, выполняет некоторую обработку и затем отображает результаты, отправляемые на сервер.
Communication Tthread
читает результаты с очереди и отправляет их. Проблема заключается в том, что соединение является беспроводным, и хотя обычно оно присутствует, оно может быть пятнистым (падение и выход за пределы диапазона в течение нескольких минут), и я не хочу блокировать Worker Thread
, если я потеряю связь.
Шаблон я выбрал для этого, заключается в следующем:
Worker Thread
имеет enqueue
метод, который добавляет сообщение к Queue
, а затем послать сигнал inproc://signal
с использованием zmq.PAIR
.
Communication Thread
использует zmq.DEALER
для связи с сервером (zmq.ROUTER
), но опрашивает inproc://signal
пару для того, чтобы зарегистрировать есть ли новое сообщение необходимости передачи или нет.
Ниже приведен упрощенный пример шаблона:
import Queue
import zmq
import time
import threading
import simplejson
class ZmqPattern():
def __init__(self):
self.q_out = Queue.Queue()
self.q_in = Queue.Queue()
self.signal = None
self.API_KEY = 'SOMETHINGCOMPLEX'
self.zmq_comm_thr = None
def start_zmq_signal(self):
self.context = zmq.Context()
# signal socket for waking the zmq thread to send messages to the relay
self.signal = self.context.socket(zmq.PAIR)
self.signal.bind("inproc://signal")
def enqueue(self, msg):
print("> pre-enqueue")
self.q_out.put(msg)
print("< post-enqueue")
print(") send sig")
self.signal.send(b"")
print("(sig sent")
def communication_thread(self, q_out):
poll = zmq.Poller()
self.endpoint_url = 'tcp://' + '127.0.0.1' + ':' + '9001'
wake = self.context.socket(zmq.PAIR)
wake.connect("inproc://signal")
poll.register(wake, zmq.POLLIN)
self.socket = self.context.socket(zmq.DEALER)
self.socket.setsockopt(zmq.IDENTITY, self.API_KEY)
self.socket.connect(self.endpoint_url)
poll.register(self.socket, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if self.socket in sockets:
message = self.socket.recv()
message = simplejson.loads(message)
# Incomming messages which need to be handled on the worker thread
self.q_in.put(message)
if wake in sockets:
wake.recv()
while not q_out.empty():
print(">> Popping off Queue")
message = q_out.get()
print(">>> Popped off Queue")
message = simplejson.dumps(message)
print("<<< About to be sent")
self.socket.send(message)
print("<< Sent")
def start(self):
self.start_zmq_signal()
# ZMQ Thread
self.zmq_comm_thr = threading.Thread(target=self.communication_thread, args=([self.q_out]))
self.zmq_comm_thr.daemon = True
self.zmq_comm_thr.name = "ZMQ Thread"
self.zmq_comm_thr.start()
if __name__ == '__main__':
test = ZmqPattern()
test.start()
print '###############################################'
print '############## Starting comms #################'
print "###############################################"
last_debug = time.time()
test_msg = {}
for c in xrange(1000):
key = 'something{}'.format(c)
val = 'important{}'.format(c)
test_msg[key] = val
while True:
test.enqueue(test_msg)
if time.time() - last_debug > 1:
last_debug = time.time()
print "Still alive..."
Если запустить это, вы будете видеть дилерские блоки, как нет маршрутизатора на другом конце, и вскоре после того, как пара блоков поскольку Communication Thread
не принимает
Как лучше всего настроить inproc zmq, чтобы не блокировать Worker Thread
.
FYI, наиболее полная система должна буферизировать порядка 200 тыс. Сообщений, а каждое сообщение составляет около 256 байт.
Так вы используете InProc ZMQ для арбитража доступа к питона 'Queue'. Почему бы не использовать ZMQ в очереди? PUSH-PULL приходит на ум. – engineerC
Не будет ли это так же блокироваться? – Jono
Да, это был скорее общий комментарий. Ответьте ниже, почему вы блокируете. – engineerC