Я пытаюсь написать сценарий сервера/клиента с сервером, который освобождает задачи, и несколькими рабочими, которые его выполняют. Проблема в том, что у моего вентилятора столько задач, что он заполнит память в одно мгновение. Я пытался установить HWM до того, как он свяжется, но без успеха. Он просто продолжает отправлять сообщения, как только рабочий подключается, полностью игнорируя установленный HWM. У меня также есть раковина, которая ведет запись задач, которые были выполнены.ZeroMQ: HWM on PUSH не работает
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
sink.py
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
Я попытаюсь воспроизвести вашу проблему. Не могли бы вы рассказать мне, какую версию PyZMQ и ZMQ вы используете? Пожалуйста, запустите 'zmq.zmq_version()' и 'zmq .__ version__' –
Версия ZMQ 4.0.3 и pyzmq 13.1.0 – Elvin
-Эх, это раздражающая комбинация. Могли бы вы обновить до pyzmq 14.0.1 и протестировать с этим (я не против, какая версия zmq вы используете, просто дайте мне знать). Я на pyzmq 13.1.0 с zmq 3.x.x на окнах, и это боль, чтобы изменить версию zmq без обновления до pyzmq v14, но я хочу убедиться, что вы все еще видите проблему с этой версией, прежде чем я попытаюсь воспроизвести –