2017-02-06 8 views
1

У меня есть 3 огромных файла CSV, содержащих климатические данные, каждый около 5 ГБ. Первая ячейка в каждой строке - это номер метеостанции (от 0 до 100 000), каждая станция содержит от 1 до 800 строк в каждом файле, что не обязательно равно во всех файлах. Например, станция 11 имеет 600, 500 и 200 строк в файлах1, file2 и file3 соответственно. Я хочу прочитать все строки каждой станции, выполнить некоторые операции над ними, затем записать результаты в другой файл, затем следующую станцию ​​и т. Д. Файлы слишком велики для загрузки сразу в памяти, поэтому Я пробовал некоторые решения читать их с минимальной нагрузкой памяти, как this post и this post, которые включают в себя этот метод:Чтение больших CSV-файлов из n-й строки в Python (не с самого начала)

with open(...) as f: 
    for line in f: 
     <do something with line> 

проблема с этим способом, который считывает файл с самого начала каждый раз, когда я хочу, чтобы читать файлы следующим образом :

for station in range (100798): 
    with open (file1) as f1, open (file2) as f2, open (file3) as f3: 
     for line in f1: 
      st = line.split(",")[0] 
      if st == station: 
       <store this line for some analysis> 
      else: 
       break # break the for loop and go to read the next file 
     for line in f2: 
      ... 
      <similar code to f1> 
      ... 
     for line in f3: 
      ... 
      <similar code to f1> 
      ... 
    <do the analysis to station, the go to next station> 

Проблема в том, что каждый раз, когда я начинаю считать следующую станцию, цикл for будет начинаться с начала, тогда как я хочу, чтобы он начинался с того места, где «Break» встречается в n-й строке, то есть для продолжения чтения файла.

Как я могу это сделать?

Заранее спасибо

Заметки о решениях ниже: Как я уже говорил ниже в то время я опубликовал свой ответ, я реализовал ответ @DerFaizio, но я нашел, что это очень медленно в обработке.

После того, как я попробовал генераторский ответ, представленный @ PM_2Ring, я нашел его очень быстрым. Может быть, потому что это зависит от генераторов.

Различия между двумя решениями можно заметить по количеству обрабатываемых станций за минуту , которые составляют 2500 ст/мин для решения на основе генератора, и 45 ст/мин для решения на основе Pandas. где Решение на базе генератора> в 55 раз быстрее.

Ниже приведены ссылки на ниже приведенные ниже описания. Большое спасибо всем участникам, особенно @ PM_2Ring.

+0

можно сохранить позицию файла, используя 'f1.tell()' и стремиться вернуться к нему в следующий раз. –

+0

Спасибо @ Jean-FrançoisFabre, но это занимает много времени, так как каждый файл содержит более 500 линий Milion. И, не сохраняя позицию file.tell(), я могу снова найти номер станции по мере сортировки. Еще раз спасибо за ваше предложение, но я думаю, что есть лучшее решение. –

+0

проблема в том, что строки имеют переменный размер, поэтому для достижения линии N вам нужно пройти все предыдущие строки хотя бы один раз (и затем кэшировать результат). Удачи с этим. –

ответ

2

Код ниже итерации по файлам по строкам, захватывая строки для каждой станции из каждого файла поочередно и добавляя их в список для дальнейшей обработки.

Сердцем этого кода является генератор file_buff, который дает строки файла, но который позволяет нам нажимать строку для последующего чтения. Когда мы читаем строку для следующей станции, мы можем отправить ее обратно на file_buff, чтобы мы могли перечитать ее, когда пришло время обрабатывать линии для этой станции.

Чтобы проверить этот код, я создал несколько простых поддельных данных станции, используя create_data.

from random import seed, randrange 

seed(123) 

station_hi = 5 
def create_data(): 
    ''' Fill 3 files with fake station data ''' 
    fbase = 'datafile_' 
    for fnum in range(1, 4): 
     with open(fbase + str(fnum), 'w') as f: 
      for snum in range(station_hi): 
       for i in range(randrange(1, 4)): 
        s = '{1} data{0}{1}{2}'.format(fnum, snum, i) 
        print(s) 
        f.write(s + '\n') 
     print() 

create_data() 

# A file buffer that you can push lines back to 
def file_buff(fh): 
    prev = None 
    while True: 
     while prev: 
      yield prev 
      prev = yield prev 
     prev = yield next(fh) 

# An infinite counter that yields numbers converted to strings 
def str_count(start=0): 
    n = start 
    while True: 
     yield str(n) 
     n += 1 

# Extract station data from all 3 files 
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3: 
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3) 

    for snum_str in str_count(): 
     station_lines = [] 
     for fb in (fb1, fb2, fb3): 
      for line in fb: 
       #Extract station number string & station data 
       sid, sdata = line.split() 
       if sid != snum_str: 
        # This line contains data for the next station, 
        # so push it back to the buffer 
        rc = fb.send(line) 
        # and go to the next file 
        break 
       # Otherwise, append this data 
       station_lines.append(sdata) 

     #Process all the data lines for this station 
     if not station_lines: 
      #There's no more data to process 
      break 
     print('Station', snum_str) 
     print(station_lines) 

выход

0 data100 
1 data110 
1 data111 
2 data120 
3 data130 
3 data131 
4 data140 
4 data141 

0 data200 
1 data210 
2 data220 
2 data221 
3 data230 
3 data231 
3 data232 
4 data240 
4 data241 
4 data242 

0 data300 
0 data301 
1 data310 
1 data311 
2 data320 
3 data330 
4 data340 

Station 0 
['data100', 'data200', 'data300', 'data301'] 
Station 1 
['data110', 'data111', 'data210', 'data310', 'data311'] 
Station 2 
['data120', 'data220', 'data221', 'data320'] 
Station 3 
['data130', 'data131', 'data230', 'data231', 'data232', 'data330'] 
Station 4 
['data140', 'data141', 'data240', 'data241', 'data242', 'data340'] 

Этот код может справиться, если данные станции не хватает для конкретной станции из одного или двух файлов, но если он отсутствует из всех трех файлов, так как прерывает основной цикл обработки, когда список station_lines пуст, но это не должно быть проблемой для ваших данных.


Для получения подробной информации о генераторах и метод generator.send, пожалуйста, см 6.2.9. Yield expressions в документации.

Этот код был разработан с использованием Python 3, но он также будет запущен на Python 2.6+ (вам просто нужно включить from __future__ import print_function в начало скрипта).

Если у всех 3-х файлов отсутствуют идентификаторы станций, мы с легкостью справимся с этим. Просто используйте простой цикл range вместо бесконечного генератора str_count.

from random import seed, randrange 

seed(123) 

station_hi = 7 
def create_data(): 
    ''' Fill 3 files with fake station data ''' 
    fbase = 'datafile_' 
    for fnum in range(1, 4): 
     with open(fbase + str(fnum), 'w') as f: 
      for snum in range(station_hi): 
       for i in range(randrange(0, 2)): 
        s = '{1} data{0}{1}{2}'.format(fnum, snum, i) 
        print(s) 
        f.write(s + '\n') 
     print() 

create_data() 

# A file buffer that you can push lines back to 
def file_buff(fh): 
    prev = None 
    while True: 
     while prev: 
      yield prev 
      prev = yield prev 
     prev = yield next(fh) 

station_start = 0 
station_stop = station_hi 

# Extract station data from all 3 files 
with open('datafile_1') as f1, open('datafile_2') as f2, open('datafile_3') as f3: 
    fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3) 

    for i in range(station_start, station_stop): 
     snum_str = str(i) 
     station_lines = [] 
     for fb in (fb1, fb2, fb3): 
      for line in fb: 
       #Extract station number string & station data 
       sid, sdata = line.split() 
       if sid != snum_str: 
        # This line contains data for the next station, 
        # so push it back to the buffer 
        rc = fb.send(line) 
        # and go to the next file 
        break 
       # Otherwise, append this data 
       station_lines.append(sdata) 

     if not station_lines: 
      continue 
     print('Station', snum_str) 
     print(station_lines) 

выход

1 data110 
3 data130 
4 data140 

0 data200 
1 data210 
2 data220 
6 data260 

0 data300 
4 data340 
6 data360 

Station 0 
['data200', 'data300'] 
Station 1 
['data110', 'data210'] 
Station 2 
['data220'] 
Station 3 
['data130'] 
Station 4 
['data140', 'data340'] 
Station 6 
['data260', 'data360'] 
+0

Спасибо большое @ PM_2Ring Этот код выглядит превосходным и умным, но мне интересно, почему вы преобразовали номер станции в строку в генераторе str_count? И что, если я хочу перебрать исходное количество станций 100797, так как там уже есть некоторые номера станций из всех трех файлов (есть больше файлов, которые содержат другие данные для отсутствующих станций, но я хочу обработать эти три файла температуры только.) –

+1

@MohammadElNesr Я преобразовал номер станции в строку в генераторе 'str_count', потому что нам нужно проверить строку номера станции для каждой прочитанной строки, и более эффективно сравнивать эти числовые строки с строкой, чем конвертировать для каждого сравнения - целое число. И я подумал, что лучше сделать это преобразование в генераторе, чем загромождать основной цикл с номером номера станции и номером номера станции. –

+1

@MohammadElNesr Прежде чем я начал писать этот код, я спросил, «Если каждый из этих 3 файлов содержит данные для каждого номера станции в диапазоне (100798)», и вы ответили, что это правильно. Мне нужно немного изменить логику, если это неверно. Но в моем часовом поясе уже поздно, и я, вероятно, не успею сделать это до завтра. –

0

Я бы предложил использовать pandas.read_csv. Вы можете указать строки, чтобы пропустить с помощью SkipRows, а также использовать достаточное количество строк для загрузки в зависимости от вашего размер_файла использования NROWS Вот ссылка на документацию: http://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html

+1

Во-первых, вы можете реализовать это достаточно легко, используя модуль 'csv', не нужно' pandas', и проблема в том, что OP не знает размер блока раньше времени ... по крайней мере так я интерпретировал Это. –

+2

OP хочет избежать повторного чтения файла –

-1

Идущая с встроенным csv, вы можете сделать что-то вроде:

with open(csvfile, 'r') as f: 
    reader = csv.reader(f, delimiter=',') 
    for i in range(n): 
     reader.next() 
    for row in reader: 
     print row # Or whatever you want to do here 

Где n - количество строк, которые вы хотите пропустить.

+0

Применить следующий на csv-ридере, потому что если там несколько строк строки, ваш код с трудом терпит неудачу. Это не имеет значения здесь, потому что OP не использует модуль csv, но все же. –

+0

@ Jean-FrançoisFabre, спасибо, отредактирован. Пропустил это. –

+0

Также ваше решение не подходит для OP. Он хочет быть расположенным на правильной линии, не читая строки с самого начала. –

0

Я отправил код ниже, прежде чем @ PM-2Ring разместил свое решение. Я хотел бы оставить оба решения активными:

Решение №1, которое зависит от библиотеки Pandas (by @DerFaizio).:

Это решение закончили 5450 станций в 120 минут (около 45 станций/мин)

import pandas as pd 
skips =[1, 1, 1] # to skip the header row forever 
for station_number in range(100798): 
    storage = {} 
    tmax = pd.read_csv(full_paths[0], skiprows=skips[0], header=None, nrows=126000, usecols=[0, 1, 3]) 
    tmin = pd.read_csv(full_paths[1], skiprows=skips[1], header=None, nrows=126000, usecols=[0, 1, 3]) 
    tavg = pd.read_csv(full_paths[2], skiprows=skips[2], header=None, nrows=126000, usecols=[0, 1, 3]) 

    # tmax is at position 0 
    for idx, station in enumerate(tmax[0]): 
     if station == station_number: 
      date_val = tmax[1][idx] 
      t_val = float(tmax[3][idx])/10. 
      storage[date_val] = [t_val, None, None] 
      skips[0] += 1 
     else: 
      break 
    # tmin is at position 1 
    for idx, station in enumerate(tmin[0]): 
     # station, date_val, _, val = lne.split(",") 
     if station == station_number: 
      date_val = tmin[1][idx] 
      t_val = float(tmin[3][idx])/10. 
      if date_val in storage: 
       storage[date_val][1] = t_val 
      else: 
       storage[date_val] = [None, t_val, None] 
      skips[1] += 1 
     else: 
      break 
    # tavg is at position 2 
    for idx, station in enumerate(tavg[0]): 
     ... 
     # similar to Tmin 
     ... 
     pass 

    station_info = [] 
    for key in storage.keys(): 
     # do some analysis 
     # Fill the list station_info 
     pass 
    data_out.writerows(station_info) 

Следующее решение является решением на основе генератора (от @ ПМ-2Ring)

Это решение закончило 30000 станций за 12 минут (около 2500 станций в минуту)

files = ['Tmax', 'Tmin', 'Tavg'] 
headers = ['Nesr_Id', 'r_Year', 'r_Month', 'r_Day', 'Tmax', 'Tmin', 'Tavg'] 

# A file buffer that you can push lines back to 
def file_buff(fh): 
    prev = None 
    while True: 
     while prev: 
      yield prev 
      prev = yield prev 
     prev = yield next(fh) 

# An infinite counter that yields numbers converted to strings 
def str_count(start=0): 
    n = start 
    while True: 
     yield str(n) 
     n += 1 

# NULL = -999.99 
print "Time started: {}".format(time.strftime('%Y-%m-%d %H:%M:%S')) 
with open('Results\\GHCN_Daily\\Important\\Temp_All_out_gen.csv', 'wb+') as out_file: 
    data_out = csv.writer(out_file, quoting=csv.QUOTE_NONE, quotechar='', delimiter=',', escapechar='\\', 
          lineterminator='\n') 
    data_out.writerow(headers) 
    full_paths = [os.path.join(source, '{}.csv'.format(file_name)) for file_name in files] 
    # Extract station data from all 3 files 
    with open(full_paths[0]) as f1, open(full_paths[1]) as f2, open(full_paths[0]) as f3: 
     fb1, fb2, fb3 = file_buff(f1), file_buff(f2), file_buff(f3) 

     for snum_str in str_count(): 
      # station_lines = [] 
      storage ={} 
      count = [0, 0, 0] 
      for file_id, fb in enumerate((fb1, fb2, fb3)): 
       for line in fb: 
        if not isinstance(get__proper_data_type(line.split(",")[0]), str): 
         # Extract station number string & station data 
         sid, date_val, _dummy, sdata = line.split(",") 
         if sid != snum_str: 
          # This line contains data for the next station, 
          # so push it back to the buffer 
          rc = fb.send(line) 
          # and go to the next file 
          break 
         # Otherwise, append this data 
         sdata = float(sdata)/10. 
         count[file_id] += 1 
         if date_val in storage: 
          storage[date_val][file_id] = sdata 
         else: 
          storage[date_val]= [sdata, None, None] 
         # station_lines.append(sdata) 

      # # Process all the data lines for this station 
      # if not station_lines: 
      #  # There's no more data to process 
      #  break 
      print "St# {:6d}/100797. Time: {}. Tx({}), Tn({}), Ta({}) ".\ 
       format(int(snum_str), time.strftime('%H:%M:%S'), count[0], count[1], count[2]) 
      # print(station_lines) 

      station_info = [] 
      for key in storage.keys(): 
       # key_val = storage[key] 
       tx, tn, ta = storage[key] 
       if ta is None: 
        if tx != None and tn != None: 
         ta = round((tx + tn)/2., 1) 
       if tx is None: 
        if tn != None and ta != None: 
         tx = round(2. * ta - tn, 1) 
       if tn is None: 
        if tx != None and ta != None: 
         tn = round(2. * ta - tx, 1) 
       # print key, 
       py_date = from_excel_ordinal(int(key)) 
       # print py_date 
       station_info.append([snum_str, py_date.year, py_date.month, py_date.day, tx, tn, ta]) 

      data_out.writerows(station_info) 
      del station_info 

Спасибо за все.

+0

'для ключа в storage.keys():' неэффективен в Python 2. Он должен построить список ключей словаря, прежде чем он сможет начать перебирать их. Вы можете выполнять итерацию непосредственно над клавишами, используя 'для ключа в хранилище:'. В Python 3 это нормально, потому что 'dict.keys()' возвращает динамический объект View (который подобен набору и который адаптируется к изменениям в базовом dict), а не список, но он все же более чистый для записи 'для хранения ключа : '. –