2013-12-23 7 views
11

У меня возникла проблема с асинхронным подключением в asyncio.Protocol.data_received обратном вызове нового модуля Python asyncio.Вызов сопрограммы в asyncio.Protocol.data_received

Рассмотрим следующий сервер:

class MathServer(asyncio.Protocol): 

    @asyncio.coroutine 
    def slow_sqrt(self, x): 
     yield from asyncio.sleep(1) 
     return math.sqrt(x) 

    def fast_sqrt(self, x): 
     return math.sqrt(x) 

    def connection_made(self, transport): 
     self.transport = transport 

    #@asyncio.coroutine 
    def data_received(self, data): 
     print('data received: {}'.format(data.decode())) 
     x = json.loads(data.decode()) 
     #res = self.fast_sqrt(x) 
     res = yield from self.slow_sqrt(x) 
     self.transport.write(json.dumps(res).encode('utf8')) 
     self.transport.close() 

используется со следующим клиентом:

class MathClient(asyncio.Protocol): 

    def connection_made(self, transport): 
     transport.write(json.dumps(2.).encode('utf8')) 

    def data_received(self, data): 
     print('data received: {}'.format(data.decode())) 

    def connection_lost(self, exc): 
     asyncio.get_event_loop().stop() 

С self.fast_sqrt который вызывается, все работает, как ожидалось.

С self.slow_sqrt он не работает.

Он также не работает с self.fast_sqrt и @asyncio.coroutine декоратором на data_received.

Я чувствую, что мне не хватает чего-то фундаментального здесь.

Полный код здесь:

Протестировано:

  • Python 3.4.0b1 (Windows)
  • Python 3.3.3 + asyncio-0.2.1 (FreeBSD)

Вопрос такой же на обоих: с slow_sqrt, клиент/сервер просто будет ничего не делать.

ответ

6

Кажется, это необходимо разделить через Future - хотя я все еще не уверен, что это правильный путь.

class MathServer(asyncio.Protocol): 

    @asyncio.coroutine 
    def slow_sqrt(self, x): 
     yield from asyncio.sleep(2) 
     return math.sqrt(x) 

    def fast_sqrt(self, x): 
     return math.sqrt(x) 

    def consume(self): 
     while True: 
     self.waiter = asyncio.Future() 
     yield from self.waiter 
     while len(self.receive_queue): 
      data = self.receive_queue.popleft() 
      if self.transport: 
       try: 
        res = self.process(data) 
        if isinstance(res, asyncio.Future) or \ 
        inspect.isgenerator(res): 
        res = yield from res 
       except Exception as e: 
        print(e) 

    def connection_made(self, transport): 
     self.transport = transport 
     self.receive_queue = deque() 
     asyncio.Task(self.consume()) 

    def data_received(self, data): 
     self.receive_queue.append(data) 
     if not self.waiter.done(): 
     self.waiter.set_result(None) 
     print("data_received {} {}".format(len(data), len(self.receive_queue))) 

    def process(self, data): 
     x = json.loads(data.decode()) 
     #res = self.fast_sqrt(x) 
     res = yield from self.slow_sqrt(x) 
     self.transport.write(json.dumps(res).encode('utf8')) 
     #self.transport.close() 

    def connection_lost(self, exc): 
     self.transport = None 

Вот answer Гвидо ван Россум:

Решение простое: написать эту логику в виде отдельного метод, помеченный с @coroutine, и огонь его в data_received() использованием async() (== Task() , в этом случае). Причина, по которой это не было построено в протокол, заключается в том, что если бы это было так, для реализации сопроцессов потребовалось бы альтернативное событие .

def data_received(self, data): 
    asyncio.ensure_future(self.process_data(data)) 

@asyncio.coroutine 
def process_data(self, data): 
    # ...stuff using yield from... 

Полный код здесь: - Client - Server