Я хочу использовать RxPy для открытия файла (csv) и обработки файла по строкам. Мой точно представить себе, чтобы иметь следующие шагиRxPy читает файлы csv и производственные линии
- обеспечивает имя файла в поток
- открыть файл
- файл чтения построчно
- удалить строки, которые начинаются с комментарием (например, # .. .)
- применять читателя CSV
- фильтра записи, соответствующие определенным критериям
До сих пор у меня есть:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
Это работает
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
Это не: ValueError: операции ввода/вывода в закрытом файле
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
2-й не работает, потому что (см. https://github.com/ReactiveX/RxPY/issues/69)
When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.
Любая идея, как я могу достичь: Что-то вроде:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by..)
).do whatever
Большое спасибо за вашу помощь
Юргена