Даже если в реализации BufferedIOBase
классов Оберните IOBase
объект, их интерфейс представляет собой поток (все наследует от IOBase
), так что обычное поведение IOBase
объекта, чтобы закрыть себя когда они выходят за рамки. BufferedIOBase
просто делегирует вызов close()
для основного потока.
Вы не должны просматривать BufferedReader
в качестве обтекателя потока (хотя это так и реализовано), но как литье типа существующего потока. Состояние двух потоков полностью связано. Однако вы можете отвязать обернутый поток с detach()
, но это оставляет объект BufferedIOBase
бесполезным.
Кроме того, io.open
возвращает BufferedReader
уже в режиме rb
, поэтому вы находитесь в двух буферизации. Вместо этого вы должны использовать io.FileIO
.
У вас есть несколько вариантов:
Создать новый поток и новый файловый дескриптор, и передавать вокруг имен файлов вместо потоков. Это ваш самый простой и безопасный вариант.
Создание необработанных файловых дескрипторов и создание потоков из них по мере необходимости. Это требует некоторого ухода за тем, что несколько потоков не используют один и тот же дескриптор файла одновременно. Например:
fd = os.open('test.txt', os.O_RDONLY)
file1 = FileIO(fd, 'r', closefd=False)
file2 = FileIO(fd, 'r', closefd=False)
file1.read(100)
assert file1.tell() == 100
file2.read(100)
assert file1.tell() == 200
detach()
основной поток до вашего BufferedIOBase
объекта закрывает поток. (Не забудьте перемотать поток!)
def test(streams):
for stream in streams:
b=TestReader(stream)
do_something(b)
wrappedstream = b.detach()
assert wrappedstream is stream
Вы можете даже реализовать это в деструкторе:
class TestReader(BufferedReader):
def __del__(self):
self.detach()
# self.raw will not be closed,
# rather left in the state it was in at detachment
Или просто отключить close()
делегации полностью, если вы считаете, что семантика ошибается:
class TestReader(BufferedReader):
def close(self):
self.closed = True
У меня нет большой картины того, что вы делаете (возможно, вам нужно другое des ign), но вот как я мог бы реализовать код, который я вижу:
from io import FileIO, BufferedReader
import io
import os
class TestReader(BufferedReader):
pass
def test(streams):
for stream in streams:
b = TestReader(stream)
def test_reset(streams):
"""Will try to leave stream state unchanged"""
for stream in streams:
pos = stream.tell()
b = TestReader(stream)
do_something(b)
b.detach()
stream.seek(pos)
filenames = ['test1.txt', 'test2.txt']
# option 1: just make new streams
streams = [FileIO(name, 'r') for name in filenames]
test(streams)
streams = [io.open(name, 'rb') for name in filenames]
#etc
# option 2: use file descriptors
fds = [os.open(name, os.O_RDONLY) for name in filenames]
#closefd = False means "do not close fd on __del__ or __exit__"
#this is only an option when you pass a fd instead of a file name
streams = [FileIO(fd, 'r', closefd=False) for fd in fds]
test(streams)
streams = []
for fd in fds:
os.lseek(fd, 0, os.SEEK_SET)
streams.append(io.open(fd, 'rb', closefd=False))
# you can also .seek(0) on the BufferedReader objects
# instead of os.lseek on the fds
# option 3: detach
streams = [FileIO(name, 'r') for name in filenames]
test_reset(streams)
# streams[*] should still be in the same state as when you passed it in
Фактически, в момент, когда появляется ваш комментарий, потоки все еще открыты. –
нет, они не делают, попробуйте сделать потоки [0] .read() – simonzack
Хм, ты прав, у меня была ошибка в моем тестовом скрипте. –