2015-03-29 4 views
3

У меня есть сценарий, в котором основной поток принимает входные данные из stdin, а затем передает его дочернему потоку с использованием очереди. В дочернем потоке я использую asyncio coroutines, чтобы развернуть слушателя в сокете и ждать соединений. После подключения я могу теперь отправлять данные через слушателя из основного потока.Python queue linking object running asyncio coroutines с вводом основного потока

Все, кажется, работает достаточно хорошо, но поскольку asyncio.BaseEventLoop не является потокобезопасным, я столкнулся с проблемами?

Это моя попытка решить проблему использования блокирующей библиотеки, такой как cmd-модуль python с asyncio.

Мой код ниже.

import sys 
import asyncio 
from time import sleep 
from threading import Thread 
from queue import Queue 

stdin_q = Queue() 

clients = {} # task -> (reader, writer) 

def client_connected_handler(client_reader, client_writer): 
    # Start a new asyncio.Task to handle this specific client connection 
    task = asyncio.Task(handle_client(client_reader, client_writer)) 
    clients[task] = (client_reader, client_writer) 

    def client_done(task): 
     # When the tasks that handles the specific client connection is done 
     del clients[task] 

    # Add the client_done callback to be run when the future becomes done 
    task.add_done_callback(client_done) 

@asyncio.coroutine 
def handle_client(client_reader, client_writer): 
    # Handle the requests for a specific client with a line oriented protocol 
    while True: 

     cmd = yield from get_input() 
     client_writer.write(cmd.encode()) 

     data = yield from client_reader.read(1024) 

     print(data.decode(),end="",flush=True) 

@asyncio.coroutine 
def get_input(): 
    while True: 
    try: 
     return stdin_q.get() 
    except: 
     pass 



class Control: 

    def start(self): 
     loop = asyncio.new_event_loop() 
     asyncio.set_event_loop(loop) 
     self.loop = asyncio.get_event_loop() 

     server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222)) 
     self.loop.run_forever() 
     self.stop() 

    def stop(self):  
     self.loop.stop() 
     self.loop.close() 

def fire_control(): 
    con = Control() 
    con.start() 

if __name__ == "__main__": 

    stdin_q.put("\n") 
    t = Thread(target=fire_control) 
    t.start() 
    sleep(2) 
    _cmd = "" 
    while _cmd.lower() != "exit": 
     _cmd = input("") 
     if _cmd == "": 
      _cmd = "\r\n" 

     stdin_q.put(_cmd)  

ответ

4

Это не будет работать совсем правильно, потому что вызов stdin_q.get() собирается заблокировать ваш цикл событий. Это означает, что если ваш сервер имеет несколько клиентов, все они будут полностью заблокированы тем, что произойдет, прежде чем попасть в stdin_q.get(), пока вы не отправите данные в очередь. Самый простой способ обойти это использовать BaseEvent.loop.run_in_executor запустить stdin_q.get в фоновом режиме ThreadPoolExecutor, что позволяет ждать его без блокировки цикла обработки событий:

@asyncio.coroutine 
def get_input(): 
    loop = asyncio.get_event_loop() 
    return (yield from loop.run_in_executor(None, stdin_q.get)) # None == use default executor. 

Edit (1/27/16):

Существует библиотека под названием janus, которая обеспечивает асинхронную, поточно-безопасную реализацию очереди.

Используя эту библиотеку, ваш код будет выглядеть следующим образом (я ушел из неизменные части):

... 
import janus 

loop = asyncio.new_event_loop() 
stdin_q = janus.Queue(loop=loop) 
... 

@asyncio.coroutine 
def get_input(): 
    loop = asyncio.get_event_loop() 
    return (yield from stdin_q.async_q.get()) 

class Control: 

    def start(self): 
     asyncio.set_event_loop(loop) 
     self.loop = asyncio.get_event_loop() 

     server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222)) 
     self.loop.run_forever() 
     self.stop() 

    def stop(self):  
     self.loop.stop() 
     self.loop.close() 

... 

if __name__ == "__main__": 

    stdin_q.sync_q.put("\n") 
    t = Thread(target=runner) 
    t.start() 
    sleep(2) 
    _cmd = "" 
    while _cmd.lower() != "exit": 
     _cmd = input("") 
     if _cmd == "": 
      _cmd = "\r\n" 

     stdin_q.sync_q.put(_cmd) 
+0

Интересно. Вы являетесь богатым знанием, когда дело касается асинчо. Поэтому, если бы я хотел отправить данные в конкретное соединение, мне просто нужно создать очередь для каждого из них для связи между основным потоком и дочерним? – RG5

+0

@ RG5 Да, это сработает. – dano

+0

Это правда, но вы также можете просто использовать 'asyncio.Queue', который, я думаю, позволит вам« ждать »stdin_q.get()' – dalanmiller

 Смежные вопросы

  • Нет связанных вопросов^_^