2013-04-13 3 views
1

У меня есть некоторый код, который отслеживает некоторые другие изменяющиеся файлы, что я хотел бы сделать, это запустить этот код, который использует zeromq с другим сокетом, так как им это делает, кажется, вызывает утверждения провалиться где-то в libzmq, так как я могу повторно использовать один и тот же сокет. как я могу обеспечить, когда я создаю новый процесс из класса монитора, контекст не будет использоваться повторно? это то, что я думаю, происходит, если вы можете сказать, что есть какая-то другая глупость с моей стороны, пожалуйста, сообщите. здесь некоторый код:pyzmq создать процесс со своим сокетом

import zmq 
from zmq.eventloop import ioloop 
from zmq.eventloop.zmqstream import ZMQStream 
class Monitor(object): 
    def __init(self) 
     self.context = zmq.Context() 
     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.connect("tcp//127.0.0.1:5055") 
     self.stream = ZMQStream(self._socket) 
     self.stream.on_recv(self.somefunc) 

    def initialize(self,id) 
     self._id = id 

    def somefunc(self, something) 
     """work here and send back results if any """ 
     import json 
     jdecoded = json.loads(something) 
     if self_id == jdecoded['_id'] 
      """ good im the right monitor for you """ 
      work = jdecoded['message'] 
      results = algorithm (work) 
      self.socket.send(json.dumps(results)) 
     else: 
      """let some other process deal with it, not mine """ 
      pass 

class Prefect(object): 
    def __init(self, id) 
     self.context = zmq.Context() 
     self.socket = self.context.socket(zmq.DEALER) 
     self.socket.bind("tcp//127.0.0.1:5055") 
     self.stream = ZMQStream(self._socket) 
     self.stream.on_recv(self.check_if) 
     self._id = id 
     self.monitors = [] 
    def check_if(self,message): 
     """find out from message's id whether we have 
      started a proces for it previously""" 
     import json 
     jdecoded = json.loads(message) 
     this_id = jdecoded['_id'] 
     if this_id in self.monitors: 
      pass 
     else: 
      """start new process for it should have its won socket """ 
      new = Monitor() 
      import Process 
      newp = Process(target=new.initialize,args=(this_id)) 
      newp.start() 
      self.monitors.append(this_id) ## ensure its remembered 

, что происходит в том, что я хочу, чтобы все ПРОЦЕССЫ монитора и один процесс префекта прослушивания на тот же порт, поэтому, когда префект видит запрос, который hasnt видел он запускает процесс для него все процессы, которые существуют, вероятно, должны также слушать, но игнорировать сообщения, не предназначенные для них. как он, если я это сделаю, я получаю некоторый сбой, возможно, связанный с одновременным доступом одного и того же сокета zmq чем-то (я попробовал threading.thread, все еще сбой). Я читал где-то, что одновременный доступ к zmq-сокет разными потоками не возможное. Как я могу гарантировать, что новые процессы получат свои собственные zmq-сокеты?

EDIT: главное дело в моем приложении является то, что запрос приходит через zmq розетку, и процесс (ы) вот слушающие реагирует на сообщение от:

1. If its directed at that process judged by the _id field, do some reading on a file and reply since one of the monitors match the messages _id, if none match, then: 
2 If the messages _id files is not recognized, all monitors ignore it but the Prefect creates a process to handle that _id and all future messages to that id. 
3. I want all the messages to be seen by the monitor processes as well as the prefect process, seems that seems easiest, 
4. All the messages are very small, avarage ~4096 bytes. 
5. The monitor does some non-blocking read and for each ioloop it sends what it has found out 

более-Edit => и процесс префекта теперь связывается, и он будет получать сообщения и эхо их, чтобы их могли видеть мониторы. Это то, что я имею в виду, так как архитектура, но не окончательная. .

Все сообщения поступают от удаленных пользователей через браузер, что позволяет серверу знать, чего хочет клиент, и сервер отправляет сообщение на сервер через zmq (я не показывал это, но не сложно), поэтому в производстве они могут не связываться/подключаться к localhost. Я выбрал DEALER, так как он разрешает asyc/неограниченные сообщения в любом направлении (см. Пункт 5.), и DEALER может связываться с DEALER, и исходный запрос/ответ может поступать с обеих сторон. Другой, который может это сделать, возможно, DEALER/ROUTER.

ответ

2

Вы правы, что не можете использовать один и тот же сокет на границе вилки (для многопроцессорности используется fork). В общем, это означает, что вы не хотите создавать сокет, который будет использоваться в разветвленном процессе, до тех пор, пока не начнется подпроцесс. Поскольку в вашем случае сокет является атрибутом объекта Monitor, вы не хотите вообще создавать монитор в основном процессе. Это будет выглядеть примерно так:

def start_monitor(this_id): 
    monitor = Monitor() 
    monitor.initialize(this_id) 
    # run the eventloop, or this will return immediately and destroy the monitor 

... inside Prefect.check_if(): 

    proc = Process(target=start_monitor, args=(this_id,)) 
    proc.start() 
    self.monitors.append(this_id) 

, а не ваш пример, где единственное, что подпроцесс делает это присвоить идентификатор, а затем убить процесс, в конечном счете, не имея никакого эффекта.

+0

Право, позвольте мне дать вам пример и вернуться к вам. хорошая идея. – mike

+0

хорошо, и приятно поймать. Я не мог этого видеть. – mike

+0

+1 Хороший ответ. Я новичок в zeromq, но это потрясающая библиотека :) – incognick