2014-01-15 1 views
3

Я пытаюсь написать сценарий сервера/клиента с сервером, который освобождает задачи, и несколькими рабочими, которые его выполняют. Проблема в том, что у моего вентилятора столько задач, что он заполнит память в одно мгновение. Я пытался установить HWM до того, как он свяжется, но без успеха. Он просто продолжает отправлять сообщения, как только рабочий подключается, полностью игнорируя установленный HWM. У меня также есть раковина, которая ведет запись задач, которые были выполнены.ZeroMQ: HWM on PUSH не работает

server.py

import zmq 

def ventilate(): 
    context = zmq.Context() 

    # Socket to send messages on 
    sender = context.socket(zmq.PUSH) 
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue 
    sender.bind("tcp://*:5557") 


    # Socket with direct access to the sink: used to syncronize start of batch 
    sink = context.socket(zmq.PUSH) 
    sink.connect("tcp://localhost:5558") 

    print "Sending tasks to workers…" 

    # The first message is "0" and signals start of batch 
    sink.send('0') 
    print "Sent starting signal" 

    while True: 
     sender.send("Message") 



if __name__=="__main__": 
    ventilate() 

worker.py

import zmq 
from multiprocessing import Process 

def work(): 
    context = zmq.Context() 

    # Socket to receive messages on 
    receiver = context.socket(zmq.PULL) 
    receiver.connect("tcp://localhost:5557") 

    # Socket to send messages to 
    sender = context.socket(zmq.PUSH) 
    sender.connect("tcp://localhost:5558") 

    # Process t asks forever 
    while True: 
     msg = receiver.recv_msg() 
     print "Doing sth with msg %s"%(msg)  
     sender.send("Message %s done"%(msg)) 

if __name__ == "__main__": 
    for worker in range(10):   
     Process(target=work).start() 

sink.py

import zmq 

def sink(): 
    context = zmq.Context() 

    # Socket to receive messages on 
    receiver = context.socket(zmq.PULL) 
    receiver.bind("tcp://*:5558") 

    # Wait for start of batch 
    s = receiver.recv() 
    print "Received start signal" 
    while True: 
     msg = receiver.recv_msg() 
     print msg 


if __name__=="__main__": 
    sink() 
+0

Я попытаюсь воспроизвести вашу проблему. Не могли бы вы рассказать мне, какую версию PyZMQ и ZMQ вы используете? Пожалуйста, запустите 'zmq.zmq_version()' и 'zmq .__ version__' –

+0

Версия ZMQ 4.0.3 и pyzmq 13.1.0 – Elvin

+0

-Эх, это раздражающая комбинация. Могли бы вы обновить до pyzmq 14.0.1 и протестировать с этим (я не против, какая версия zmq вы используете, просто дайте мне знать). Я на pyzmq 13.1.0 с zmq 3.x.x на окнах, и это боль, чтобы изменить версию zmq без обновления до pyzmq v14, но я хочу убедиться, что вы все еще видите проблему с этой версией, прежде чем я попытаюсь воспроизвести –

ответ

5

Хорошо, я игра вокруг, я не думаю, что вопрос с PUSH HWM, а скорее, что вы не можете установить HWM для PULL. Если вы посмотрите на this documentation, вы увидите, что он говорит N/A для действия на HWM.

Кажется, что сокеты PULL принимают сотни сообщений (и я попытался установить HWM на всякий случай, если он что-то сделал на гнезде PULL, а это не так.). Об этом я убедился, изменив вентилятор для отправки сообщений с добавочным целым числом и сменив каждого рабочего в пуле, чтобы подождать 2 секунды между вызовами до recv(). Рабочие распечатывают, что обрабатывают сообщения с совершенно разными целыми числами. Например, один рабочий будет работать с сообщением 10, а следующий работает над сообщением 400. По прошествии времени вы видите, что рабочий, который обрабатывал сообщение 10, теперь обрабатывает сообщение 11, 12, 13 и т. Д., В то время как другое - обработка 401, 402 и т. д.

Это указывает на то, что сокет ZMQ_PULL буферизует сообщения где-то. Таким образом, в то время как в гнезде ZMQ_PUSH есть HWM, сокет PULL быстро запрашивает сообщения, несмотря на то, что на них фактически не обращаются по вызову recv(). Таким образом, результаты PUSH HWM эффективно игнорируются, если подключен PULL-разъем. Насколько я могу судить, вы не можете контролировать длину буфера сокета PULL (я бы ожидал, что опция сокета RCVHWM будет управлять этим, но он не появляется).

Такое поведение, разумеется, вызывает вопрос, какова точка опции ZMQ_PULL HWM, что имеет смысл иметь, если вы также можете управлять приемными сокетами HWM.

На этом этапе я бы начал спрашивать 0MQ people, есть ли у вас что-то очевидное или если это считается ошибкой.

Извините, я не мог больше помочь!

+0

Большое спасибо за усилия, которые вы сделали до сих пор. Я выяснил, что установка setsockopt (zmq.RCVBUF, 2) на самом деле замедлит работу. По умолчанию он установлен в 0, что означает, что размер буфера по умолчанию для операционной системы. Не знаю, что это. Он по-прежнему не совсем делает то, что я хочу, но он приближается. – Elvin

1

ZeroMQ имеет буферы на обоих приемных и приемных концах гнезда, поэтому вам необходимо установить высокие метки воды как на PUSH, так и на гнездо PULL в вашем коде (и действительно, до bind() или connect()).

В привязках Python это удобно сделать через socket.hwm = 1, который установит как ZMQ_SNDHWM, так и ZMQ_RCVHWM за один раз.