2015-01-20 4 views
1

Я хотел бы использовать ZMQ для реализации (в python) брокера и клиента, который обрабатывает запрос-ответ адресованным сущностям асинхронно. Клиент содержит функциональные возможности для выполнения как запросов, так и ответов (единственное, что отсутствует, - это точный тип/тип сокета).ZMQ Python Request Reply Broker для адресованных асинхронных запросов

Запрос может блокироваться, но сторона ответа должна иметь возможность обрабатывать параллельные (потоковые) запросы по мере их поступления. (То есть REP-сокет недостаточно хорош, так как он требует отправки до следующего получения)

Он должен пройти через посредника, так как существует множество возможных объектов, которые могут выполнять запросы и ответы, и я хочу только привязать определенное количество портов (не к одному объекту).

Entity1    Broker     Entity2 
    REQ ------------- ROUTER ?????? -------------- ?????? 

Entity1 будет знать идентификатор entity2 и использовать, чтобы убедиться, что запрос был сделан на entity2 специально. Может быть любое количество объектов, но все объекты, которые должны отвечать на запросы, будут регистрировать идентификаторы.

Я пробовал с ДИЛЕРОМ на правой стороне брокера выше, но тот будет отправлять запросы только кругом.

Так кто-нибудь знает хороший образец/набор сокетов, я мог бы использовать для асинхронного обращения к определенному объекту?

Резюме:

  • Блокирование по запросу стороны
  • Брокер/прокси для связывания фиксированного количества портов
  • Отвечающему гнездо должно быть специально рассмотрены в запрашивающей
  • Резьбовая ответы (Ответная сторона может принимать и обрабатывать параллельных запросов)

Я читал руководство ZMQ довольно подробно, но я не нашел реального хорошего шаблона для обращения к конкретным сокетам через брокера, поэтому любая помощь очень ценится.

ответ

1

После нескольких дальнейших исследований и испытаний я нашел образец, который, как представляется, обеспечивает решение всех моих требований.

Pattern

Requester    Broker     Replier 
    REQ ------------- ROUTER ROUTER -------------- DEALER 
       (requests) (replies) 

Запроса

Запрос на стороне клиента просто подключается к запросу маршрутизатора от брокера, посылает запрос и начинает читать сокет для ответа:

reqSocket.connect(self._reqAddress) 
reqSocket.send_multipart([repId, message]) 
reply = reqSocket.recv_multipart()[0] 

Идентификатор ответа включен как первая часть сообщение, например:

Outgoing message: ['replierId', 'requestMsg'] 

Запрос Маршрутизатор

if self.reqRouterSocket in socketEvents: 
    multipart = self.reqRouterSocket.recv_multipart() 
    multipart = [multipart[-2]] + multipart 
    del multipart[-2] 
    self.repRouterSocket.send_multipart(multipart) 

Т.е., запрос маршрутизатора просто перемещает первую часть полезной нагрузки (будучи в replierId) и помещает его сначала в стеке адреса :

Incoming message: ['reqSocketAddr', '', 'replierId', 'requestMsg'] 
Outgoing message: ['replierId', 'reqSocketAddr', '', 'requestMsg'] 

Исходящее сообщение отправляется с маршрутизатора-ответчика. Поскольку у replier есть идентификатор сокета, установленный на «replierId» и подключенный к маршрутизатору-ответчику, этот маршрутизатор распознает этот адрес и может успешно доставить запрос.

Replier

replier нужно установить его собственную идентичность сокет к некоторому известному значению для того, чтобы быть непосредственно на имя, как описано выше.

ПРИМЕЧАНИЕ: Вы должны установить идентификатор гнезда разъема DEALER ДО того, как вы выполняете подключение к ответному маршрутизатору. Чтобы установить идентичность гнезда:

self.dealerSocket.setsockopt(zmq.IDENTITY, 'replierId') 

Иначе маршрутизатор не будет знать идентификатор и будет передавать сообщения.

Ответчик прослушивает входящие запросы. В моем случае это все threaded, и запросы обрабатываются асинхронно. В этом причина использования гнезда DEALER вместо обычного REP, который в синхронном случае будет намного проще. Гнездо DEALER может получать дополнительные запросы, не отвечая сначала на первый, который должен выполнить REP. Упрощенная версия о том, что делается на стороне replier однако:

multipart = self.dealerSocket.recv_multipart() 
returnRoute = multipart[:-1] 
requestMsg = multipart[-1] 
reply = someFunction(requestMsg) 
self.dealerSocket.send_multipart(returnRoute + [reply]) 

Т.е., replier просто возвращает то, что он получил, но с просьбой изменилось за ответ вместо:

Incoming message: ['replierId', 'reqSocketAddr', '', 'request'] 
Outgoing message: ['replierId', 'reqSocketAddr', '', 'reply'] 

Это уходящей сообщение затем отправляется обратно на ответный маршрутизатор.

Ответить Маршрутизатор

Маршрутизатор выбран на этой стороне брокера исключительно из-за тот факт, что он нуждается функциональностью для решения конкретного сокета среди многих подключенных из них.

if self.repRouterSocket in socketEvents: 
    multipart = self.repRouterSocket.recv_multipart() 
    self.reqRouterSocket.send_multipart(multipart[1:]) 

Т.е. просто введите первый адрес стека адресов и снова отправьте сообщение на запрашивающую сторону.

Incoming message: ['replierId', 'reqSocketAddr', '', 'reply'] 
Outgoing message: ['reqSocketAddr', '', 'reply'] 

Запрос маршрутизатор распознает этот адрес и отправляет запрос обратно к запрашивающей, который получает:

Incoming list: ['reply'] 

Эта модель, кажется, отвечают требованиям, которые я сделал в моем вопросе. Надеюсь, он может быть полезен и для других.

+0

Спасибо! Я столкнулся с той же проблемой, и ваш ответ помог мне. – gar