Я написал AsyncFile
класс некоторое время назад, интерфейс проще, чем протоколы низкого уровня.
Исходный код здесь: https://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20
class AsyncFile:
"""A local file class for use with the ``asyncio`` module.
``loop`` should be the event loop in use.
``filename`` is the name of the file to be opened.
``fileobj`` should be a regular file-like object.
``mode`` is the open mode accepted by built-in function ``open``.
If ``filename`` is specified, the named file will be opened. And if
``fileobj`` is specified, that file object will be used directly. You
cannot specify both ``filename`` and ``fileobj``.
This class can be used in a ``with`` statement.
"""
DEFAULT_BLOCK_SIZE = 8192
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
if filename is not None:
fileobj.close()
errcode = ctypes.get_errno()
raise OSError((errcode, errno.errorcode[errcode]))
self._fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._rbuffer = bytearray()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def fileno(self):
return self._fileobj.fileno()
def seek(self, offset, whence=None):
if whence is None:
return self._fileobj.seek(offset)
else:
return self._fileobj.seek(offset, whence)
def tell(self):
return self._fileobj.tell()
def _read_ready(self, future, n, total):
if future.cancelled():
self._loop.remove_reader(self._fileobj.fileno())
return
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_reader(self._fileobj.fileno())
future.set_exception(exc)
return
if not res: # EOF
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(self._rbuffer))
return
self._rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self._rbuffer)
if more_to_go <= 0: # enough
res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(res))
else:
more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, more_to_go, total)
else: # total < 0
# This callback is still registered with total < 0,
# nothing to do here
pass
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self._loop)
if n == 0:
future.set_result(b'')
else:
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
if n < 0:
self._rbuffer.clear()
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, self.DEFAULT_BLOCK_SIZE, n)
else:
self._rbuffer.clear()
read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, read_block_size, n)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def _write_ready(self, future, data, written):
if future.cancelled():
self._loop.remove_writer(self._fileobj.fileno())
return
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_writer(self._fileobj.fileno())
future.set_exception(exc)
return
if res < len(data):
data = data[res:]
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, written + res)
else:
self._loop.remove_writer(self._fileobj.fileno())
future.set_result(written + res)
@asyncio.coroutine
def write(self, data):
future = asyncio.Future(loop=self._loop)
if len(data) == 0:
future.set_result(0)
else:
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, 0)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def stat(self):
return os.stat(self._fileobj.fileno(), follow_symlinks=True)
def close(self):
self._loop.remove_reader(self._fileobj.fileno())
self._loop.remove_writer(self._fileobj.fileno())
self._fileobj.close()
Я думаю, вам нужно создать свой собственный транспорт и протокол для чтения/записи на телетайп. Посмотрите на источник и попробуйте адаптировать сокеты/подпроцесс транспорта/протокола для вашего пользователя. – gawel
Да, это то, на что похоже, но похоже, что весь eventloop нужно переубедить, так как он запускает создание socketpair, что здесь не актуально. бит озадачен, что кажется, что он полностью не поддерживается (я имею в виду, что чтение/запись в tty - это самый простой вариант использования async, верно?) – time4tea
yep. не должно быть так трудно сделать. и вы ошибаетесь. для использования eventloop не требуется сокет. вы можете хотя бы использовать подпроцесс вместо – gawel