Добрый день,Dask - Рекурсивная или массивная нарезка, вызывающая большую память?
Я искал некоторую помощь с пониманием чрезмерного (или возможно не) использования памяти в моей цепочке обработки Dask.
Проблема возникает от выполнения следующих функций:
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
Где master_array
является Dask Array
слишком большой, чтобы держать в памяти (703 57600001 точек в данном случае).
В качестве минимального, например, следующие вызывает такое же использование памяти как полный код ниже
import dask.array as da
import numpy as np
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Fabricate an input array of the same shape and size as the problematic dataset
master_array = da.random.normal(10, 0.1, size = (703, 57600001), chunks = (703, 372))
# Execute the create_fft_arrays function
fft_arrays = create_fft_arrays(master_array.T, 2**15, 0.5)
Чтобы поместить код в контексте выполнения следующего кода вызывает у меня ОЗУ (20GB) для макс вне при выполнении последней строки fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
:
import dask.array as da
import h5py as h5
import numpy as np
import os
FORMAT = '.h5'
DSET_PATH = '/DAS/Data'
TSET_PATH = '/DAS/Time'
FFT_SIZE = 2**15
OVERLAP = 0.5
input_dir = r'D:\'
file_paths = []
# Get list of all valid files in directory
for dir_name, sub_dir, f_name in os.walk(input_dir):
for f in f_name:
if f[-1*len(FORMAT):] == FORMAT:
file_paths.append(os.path.join(dir_name, f))
#H5 object for each file
file_handles = [h5.File(f_path, 'r') for f_path in file_paths]
# Handle for dataset and timestamps from each file
dset_handles = [f[DSET_PATH] for f in file_handles]
tset_handles = [f[TSET_PATH] for f in file_handles]
# Create a Dask Array object for each dataset and timestamp set
dset_arrays = [da.from_array(dset, chunks = dset.chunks) for dset in dset_handles]
tset_arrays = [da.from_array(tset, chunks = tset.chunks) for tset in tset_handles]
# Concatenate all datasets along along the time axis
master_array = da.concatenate(dset_arrays, axis = 1)
def create_fft_arrays(master_array, fft_size, overlap):
input_shape = master_array.shape[0]
# Determine zero pad length
zero_len = fft_size - ((input_shape - fft_size) % ((1-overlap) * fft_size))
zeros = da.zeros((zero_len, master_array.shape[1]),
dtype = master_array.dtype,
chunks = (zero_len, master_array.shape[1]))
# Create the reshaped array
reshape_array = da.concatenate((master_array, zeros), axis = 0)
# Create an index series to use to index the reshaped array for re-blocking.
fft_index = np.arange(0, reshape_array.shape[0] - (fft_size -1), fft_size * overlap)
# Break reshape_array into fft size chunks
fft_arrays = [reshape_array[x:x + fft_size] for x in fft_index]
# Returns list of dask arrays
return [array.rechunk(array.shape) for array in fft_arrays]
# Break master_array into FFT sized arrays with a single chunk in each
fft_arrays = create_fft_arrays(master_array.T, FFT_SIZE, 0.5)
После этого я пошел бы, чтобы использовать метод da.fft.fft
для расчета частотной характеристики каждого из этих FFT массивов.
Любая помощь или рекомендации, было бы весьма признателен,
Джордж
Возможно, вы получите лучший ответ раньше, если сможете создать [mcve] (http://stackoverflow.com/help/mcve). – MRocklin
Исправлено, надеюсь, это может быть немного яснее. –
Проходя по строкам, это, как представляется, линия, использующая большие объемы памяти: 'fft_arrays = [reshape_array [x: x + FFT_SIZE] для x в fft_index]'. Он вызывает операцию разрезания? –