2016-10-25 4 views
0

Добрый день,Dask - Рекурсивная или массивная нарезка, вызывающая большую память?

Я искал некоторую помощь с пониманием чрезмерного (или возможно не) использования памяти в моей цепочке обработки Dask.

Проблема возникает от выполнения следующих функций:

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

Где master_array является Dask Array слишком большой, чтобы держать в памяти (703 57600001 точек в данном случае).

В качестве минимального, например, следующие вызывает такое же использование памяти как полный код ниже

import dask.array as da 
import numpy as np 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Fabricate an input array of the same shape and size as the problematic dataset 
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 

# Execute the create_fft_arrays function 
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5) 

Чтобы поместить код в контексте выполнения следующего кода вызывает у меня ОЗУ (20GB) для макс вне при выполнении последней строки fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5):

import dask.array as da 

import h5py as h5 
import numpy as np 

import os 

FORMAT = '.h5' 
DSET_PATH = '/DAS/Data' 
TSET_PATH = '/DAS/Time' 

FFT_SIZE = 2**15 
OVERLAP = 0.5 

input_dir = r'D:\' 
file_paths = [] 

# Get list of all valid files in directory 
for dir_name, sub_dir, f_name in os.walk(input_dir): 
    for f in f_name: 
     if f[-1*len(FORMAT):] == FORMAT: 
      file_paths.append(os.path.join(dir_name, f)) 

#H5 object for each file 
file_handles = [h5.File(f_path, 'r') for f_path in file_paths] 

# Handle for dataset and timestamps from each file 
dset_handles = [f[DSET_PATH] for f in file_handles] 
tset_handles = [f[TSET_PATH] for f in file_handles] 

# Create a Dask Array object for each dataset and timestamp set 
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles] 
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles] 

# Concatenate all datasets along along the time axis 
master_array = da.concatenate(dset_arrays, axis = 1) 

def create_fft_arrays(master_array, fft_size, overlap): 

    input_shape = master_array.shape[0] 
    # Determine zero pad length 
    zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size)) 

    zeros = da.zeros((zero_len, master_array.shape[1]), 
        dtype = master_array.dtype, 
        chunks = (zero_len, master_array.shape[1])) 
    # Create the reshaped array 
    reshape_array = da.concatenate((master_array, zeros), axis = 0) 
    # Create an index series to use to index the reshaped array for re-blocking. 
    fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap) 
    # Break reshape_array into fft size chunks 
    fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index] 

    # Returns list of dask arrays 
    return [array.rechunk(array.shape) for array in fft_arrays] 

# Break master_array into FFT sized arrays with a single chunk in each 
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5) 

После этого я пошел бы, чтобы использовать метод da.fft.fft для расчета частотной характеристики каждого из этих FFT массивов.

Любая помощь или рекомендации, было бы весьма признателен,

Джордж

+0

Возможно, вы получите лучший ответ раньше, если сможете создать [mcve] (http://stackoverflow.com/help/mcve). – MRocklin

+0

Исправлено, надеюсь, это может быть немного яснее. –

+0

Проходя по строкам, это, как представляется, линия, использующая большие объемы памяти: 'fft_arrays = [reshape_array [x: x + FFT_SIZE] для x в fft_index]'. Он вызывает операцию разрезания? –

ответ

0

Ваш мастер массив имеет очень много кусков

>>> master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372)) 
>>> master_array.npartitions 
154839 

Существует несколько административные накладные расходы для каждого блока, так это хорошо держите номер несколько меньше этого. Это section on chunks от dask.array documentation

Ваше узкое место возникает, когда вы пытаетесь нарезать этот массив тысячи раз.

Ваша проблема может быть несколько решена путем увеличения детализации. Приведенный выше документ содержит некоторые рекомендации.

+0

Спасибо за совет. –

+0

Спасибо за совет. Я начал с этого размера куска, так как это размер куска h5. В конечном итоге данные IO с диска будут узким местом, но я хотел поэкспериментировать с Dask, чтобы попытаться сделать эту рабочую шкалу для экземпляра EC2. Увеличение размера фрагмента для 'master_array', по-видимому, позволяет мне построить график как минимум. Спасибо за помощь. –

 Смежные вопросы

  • Нет связанных вопросов^_^