2017-02-21 28 views
1

У меня есть большое количество файлов, которые я хочу загрузить, выполнить некоторую обработку, а затем сохранить обработанные данные. Для этого у меня есть следующий код:Использование joblib заставляет python потреблять все большее количество ОЗУ при запуске скрипта

from os import listdir 
from os.path import dirname, abspath, isfile, join 
import pandas as pd 
import sys 
import time 
# Multi-threading 
from joblib import Parallel, delayed 
import multiprocessing 

# Number of cores 
TOTAL_NUM_CORES = multiprocessing.cpu_count() 
# Path of this script's file 
FILES_PATH = dirname(abspath(__file__)) 

def read_and_convert(f,num_files): 
    # Read the file 
    dataframe = pd.read_csv(FILES_PATH + '\\Tick\\' + f, low_memory=False, header=None, names=['Symbol', 'Date_Time', 'Bid', 'Ask'], index_col=1, parse_dates=True) 
    # Resample the data to have minute-to-minute data, Open-High-Low-Close format. 
    data_bid = dataframe['Bid'].resample('60S').ohlc() 
    data_ask = dataframe['Ask'].resample('60S').ohlc() 
    # Concatenate the OLHC data 
    data_ask_bid = pd.concat([data_bid, data_ask], axis=1, keys=['Bid', 'Ask']) 
    # Keep only non-weekend data (from Monday 00:00 until Friday 22:00) 
    data_ask_bid = data_ask_bid[(((data_ask_bid.index.weekday >= 0) & (data_ask_bid.index.weekday <= 3)) | ((data_ask_bid.index.weekday == 4) & (data_ask_bid.index.hour < 22)))] 
    # Save the processed and concatenated data of each month in a different folder 
    data_ask_bid.to_csv(FILES_PATH + '\\OHLC\\' + f) 
    print(f) 

def main(): 
    start_time = time.time() 
    # Get the paths for all the tick data files 
    files_names = [f for f in listdir(FILES_PATH + '\\Tick\\') if isfile(join(FILES_PATH + '\\Tick\\', f))] 

    num_cores = int(TOTAL_NUM_CORES/2) 
    print('Converting Tick data to OHLC...') 
    print('Using ' + str(num_cores) + ' cores.') 
    # Open and convert files in parallel 
    Parallel(n_jobs=num_cores)(delayed(read_and_convert)(f,len(files_names)) for f in files_names) 
    # for f in files_names: read_and_convert(f,len(files_names)) # non-parallel 
    print("\nTook %s seconds." % (time.time() - start_time)) 

if __name__ == "__main__": 
    main() 

Первые несколько файлов обрабатываются очень быстро этот путь, но скорость начинает получать неряшливо, как скрипт обрабатывает дальше и дальнейшие файлы. По мере обработки большего количества файлов ОЗУ становится все более полным, как показано ниже. Разве это не joblib, очищающий необработанные данные, когда он циклически проходит через файлы?

enter image description here

ответ

0

Добавление gc.collect() к последней строке функции вы работаете на параллельно избегает RAM от получения насыщенными. gc.collect() - сборщик мусора Python.