2016-10-24 12 views
1

Небольшой контекст: я открываю этот вопрос, возникший here, после решения проблемы аутентификации. Я предпочитаю открывать новую, чтобы избежать загрязнения предыдущей комментариями, не относящимися к исходной проблеме, и дать ей правильную видимость.Клиент Python SOAP с Zipp - импортовым пространством имен

Я работаю над клиентом SOAP, работающим в той же интрасети, что и сервер, без доступа в Интернет.

from requests.auth import HTTPBasicAuth 
from zeep import Client 
from zeep.transports import Transport 

wsdl = 'http://mysite.dom/services/MyWebServices?WSDL' 
client = Client(wsdl, transport=HTTPBasicAuth('user','pass'), cache=None) 

Проблема: WSDL содержит импорт на внешний ресурс, расположенный вне локальной сети («импорта пространства имен =„schemas.xmlsoap.org/soap/encoding/“») и, следовательно, клиент Zeep экземпляра терпит неудачу с:

Exception: HTTPConnectionPool(host='schemas.xmlsoap.org', port=80): Max retries exceeded with url: /soap/encoding/ (Caused by NewConnectionError('<requests.packages.urllib3.connection.HTTPConnection object at 0x7f3dab9d30b8>: Failed to establish a new connection: [Errno 110] Connection timed out',)) 

Вопрос: возможно ли (и имеет ли смысл) создавать Zeep Client без доступа к внешнему ресурсу?

В качестве дополнительной информации другой клиент, написанный на Java, на основе XML rpc ServiceFactory, кажется более устойчивым к этой проблеме, служба создается (и работает), даже если интернет-соединение недоступно. Действительно ли нужно импортировать пространство имен из xmlsoap.org?

Edit, после ответа от @mvt:

Итак, я пошел на предложенное решение, которое позволяет мне одновременно контролировать доступ к внешним ресурсам (читай: запретить доступ к серверам, отличным от на котором размещается конечная точка).

class MyTransport(zeep.Transport): 
    def load(self, url): 
     if not url: 
      raise ValueError("No url given to load") 
     parsed_url = urlparse(url) 
     if parsed_url.scheme in ('http', 'https'): 
      if parsed_url.netloc == "myserver.ext": 
       response = self.session.get(url, timeout=self.load_timeout) 
       response.raise_for_status() 
       return response.content 
      elif url == "http://schemas.xmlsoap.org/soap/encoding/": 
       url = "/some/path/myfile.xsd" 
      else: 
       raise 
     elif parsed_url.scheme == 'file': 
      if url.startswith('file://'): 
       url = url[7:] 
     with open(os.path.expanduser(url), 'rb') as fh: 
      return fh.read() 

ответ

3

Вы можете создать свой собственный подкласс класса Tranport и добавить дополнительную логику к методу нагрузки(), так что конкретные URL перенаправляются/загружаются из файловой системы.

Код довольно легко я думаю: https://github.com/mvantellingen/python-zeep/blob/master/src/zeep/transports.py :-)

+0

Просто отлично, спасибо Майкл! В качестве примера я напишу свое решение в вопросе. – Pintun