2016-12-29 4 views
1

Я хочу использовать RxPy для открытия файла (csv) и обработки файла по строкам. Мой точно представить себе, чтобы иметь следующие шагиRxPy читает файлы csv и производственные линии

  1. обеспечивает имя файла в поток
  2. открыть файл
  3. файл чтения построчно
  4. удалить строки, которые начинаются с комментарием (например, # .. .)
  5. применять читателя CSV
  6. фильтра записи, соответствующие определенным критериям

До сих пор у меня есть:

def to_file(filename): 
f = open(filename) 
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()), 
    lambda d: Observable.just(f) 
) 

def to_reader(f): 
    return csv.reader(f) 

def print_rows(reader): 
    for row in reader: 
     print(row) 

Это работает

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**map**(to_reader).subscribe(print_rows) 

Это не: ValueError: операции ввода/вывода в закрытом файле

Observable.from_(["filename.csv", "filename2.csv"]) 
    .flat_map(to_file).**flat_map**(to_rows).subscribe(print) 

2-й не работает, потому что (см. https://github.com/ReactiveX/RxPY/issues/69)

When the observables from the first flatmap is merged by the second flatmap, the inner subscriptions will be disposed when they complete. Thus the files will be closed, even if the file handles are on_next'ed into the new observable set up by the second flatmap.

Любая идея, как я могу достичь: Что-то вроде:

Observable.from_(["filename.csv", "filename2.csv"] 
    ).flat_map(to_file 
    ).filter(comment_lines 
    ).filter(empty_lines 
    ).map(to_csv_reader 
    ).filter(filter_by..) 
    ).do whatever 

Большое спасибо за вашу помощь

Юргена

ответ

0

Я только начал работать с RxPy в последнее время и нужно делать то же самое. Удивленный кто-то еще не ответил на ваш вопрос, но решил ответить на случай, если кто-то еще должен знать. Предполагая, что у вас есть CSV-файл, как это:

$ cat datafile.csv 
"iata","airport","city","state","country","lat","long" 
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472 
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778 
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933 
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056 
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389 

Вот решение:

from rx import Observable 
from csv import DictReader 

Observable.from_(DictReader(open('datafile.csv', 'r'))) \ 
      .subscribe(lambda row: 
        print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35])) 
     )