У меня есть сценарий, в котором основной поток принимает входные данные из stdin, а затем передает его дочернему потоку с использованием очереди. В дочернем потоке я использую asyncio coroutines, чтобы развернуть слушателя в сокете и ждать соединений. После подключения я могу теперь отправлять данные через слушателя из основного потока.Python queue linking object running asyncio coroutines с вводом основного потока
Все, кажется, работает достаточно хорошо, но поскольку asyncio.BaseEventLoop не является потокобезопасным, я столкнулся с проблемами?
Это моя попытка решить проблему использования блокирующей библиотеки, такой как cmd-модуль python с asyncio.
Мой код ниже.
import sys
import asyncio
from time import sleep
from threading import Thread
from queue import Queue
stdin_q = Queue()
clients = {} # task -> (reader, writer)
def client_connected_handler(client_reader, client_writer):
# Start a new asyncio.Task to handle this specific client connection
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_reader, client_writer)
def client_done(task):
# When the tasks that handles the specific client connection is done
del clients[task]
# Add the client_done callback to be run when the future becomes done
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
# Handle the requests for a specific client with a line oriented protocol
while True:
cmd = yield from get_input()
client_writer.write(cmd.encode())
data = yield from client_reader.read(1024)
print(data.decode(),end="",flush=True)
@asyncio.coroutine
def get_input():
while True:
try:
return stdin_q.get()
except:
pass
class Control:
def start(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.loop = asyncio.get_event_loop()
server = self.loop.run_until_complete(asyncio.start_server(client_connected_handler, '0.0.0.0', 2222))
self.loop.run_forever()
self.stop()
def stop(self):
self.loop.stop()
self.loop.close()
def fire_control():
con = Control()
con.start()
if __name__ == "__main__":
stdin_q.put("\n")
t = Thread(target=fire_control)
t.start()
sleep(2)
_cmd = ""
while _cmd.lower() != "exit":
_cmd = input("")
if _cmd == "":
_cmd = "\r\n"
stdin_q.put(_cmd)
Интересно. Вы являетесь богатым знанием, когда дело касается асинчо. Поэтому, если бы я хотел отправить данные в конкретное соединение, мне просто нужно создать очередь для каждого из них для связи между основным потоком и дочерним? – RG5
@ RG5 Да, это сработает. – dano
Это правда, но вы также можете просто использовать 'asyncio.Queue', который, я думаю, позволит вам« ждать »stdin_q.get()' – dalanmiller