2016-12-19 10 views
1

У меня есть следующий код, используемый для интерполяции данных трехмерного тома.Ускорение интерполяции Python 3D

Y, X, Z = np.shape(volume) 
xs = np.arange(0, X) 
ys = np.arange(0, Y) 
zs = np.arange(0, Z) 

points = list(zip(np.ravel(result[:, :, :, 1]), np.ravel(result[:, :, :, 0]), np.ravel(result[:, :, :, 2]))) 
interp = interpolate.RegularGridInterpolator((ys, xs, zs), volume, 
              bounds_error=False, fill_value=0, method='linear') 
new_volume = interp(points) 
new_volume = np.reshape(new_volume, (Y, X, Z)) 

Этот код занимает около 37 секунд, чтобы выполнить на 512x512x110 объема (около 29 миллионов точек), что приводит к более чем на одну микросекунду на вокселе (что неприемлемо количество времени для меня - то, что больше он использует 4 ядра). Звонок new_volume=interp(points) занимает около 80% времени продеберации и создания списка почти в течение всего оставшегося времени.

Есть ли простой (или даже более сложный) способ сделать это вычисление быстрее? Или есть ли хорошая библиотека Python, которая обеспечивает быструю интерполяцию? Мой объем и точки меняются при каждом вызове этого продукта.

+0

Я не совсем уверен, что вы правильно выполняете интерполяцию (трудно сказать, не зная, что такое «результат»). Для массива такого размера 37 секунд не слишком странно. Генерация 'points' может быть значительно ускорена, используя' np.c_ [...] 'вместо' list (zip (...)) '. Посмотрите на [griddata] (https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html), который займет еще больше времени. Если вам нужна более быстрая интерполяция в таком массиве, вам, вероятно, придется закодировать свой код CUDA и ускорить работу в графическом процессоре. –

+0

@Imanol Спасибо за упоминание функции np.c - я попробую это. 'Result [:,:,:, 1]' является сеткой Y, 'result [:,:,:, 0]' является сеткой X и так далее.Что касается реализации GPU - я тоже это сделаю, но все же - более чем на микросекунду для одной интерполяции вокселей с использованием 4 ядер достаточно высоко. Даже ускорение в 3-5 раз было бы потрясающе. – Nefarin

+0

Попробуйте использовать 'scipy.ndimage.map_coordinates', если вы хотите интерполировать на равную сетку. Обертка может быть найдена здесь (https://github.com/syrte/handy/blob/new/interpolate.py) –

ответ

2

Здесь слегка модифицированную версию cython решения:

import numpy as np 
cimport numpy as np 
from libc.math cimport floor 
from cython cimport boundscheck, wraparound, nonecheck, cdivision 

DTYPE = np.float 
ctypedef np.float_t DTYPE_t 

@boundscheck(False) 
@wraparound(False) 
@nonecheck(False) 
def interp3D(DTYPE_t[:,:,::1] v, DTYPE_t[:,:,::1] xs, DTYPE_t[:,:,::1] ys, DTYPE_t[:,:,::1] zs): 

    cdef int X, Y, Z 
    X,Y,Z = v.shape[0], v.shape[1], v.shape[2] 
    cdef np.ndarray[DTYPE_t, ndim=3] interpolated = np.zeros((X, Y, Z), dtype=DTYPE) 

    _interp3D(&v[0,0,0], &xs[0,0,0], &ys[0,0,0], &zs[0,0,0], &interpolated[0,0,0], X, Y, Z) 
    return interpolated 


@cdivision(True) 
cdef inline void _interp3D(DTYPE_t *v, DTYPE_t *x_points, DTYPE_t *y_points, DTYPE_t *z_points, 
       DTYPE_t *result, int X, int Y, int Z): 

    cdef: 
     int i, x0, x1, y0, y1, z0, z1, dim 
     DTYPE_t x, y, z, xd, yd, zd, c00, c01, c10, c11, c0, c1, c 

    dim = X*Y*Z 

    for i in range(dim): 
     x = x_points[i] 
     y = y_points[i] 
     z = z_points[i] 

     x0 = <int>floor(x) 
     x1 = x0 + 1 
     y0 = <int>floor(y) 
     y1 = y0 + 1 
     z0 = <int>floor(z) 
     z1 = z0 + 1 

     xd = (x-x0)/(x1-x0) 
     yd = (y-y0)/(y1-y0) 
     zd = (z-z0)/(z1-z0) 

     if x0 >= 0 and y0 >= 0 and z0 >= 0: 
      c00 = v[Y*Z*x0+Z*y0+z0]*(1-xd) + v[Y*Z*x1+Z*y0+z0]*xd 
      c01 = v[Y*Z*x0+Z*y0+z1]*(1-xd) + v[Y*Z*x1+Z*y0+z1]*xd 
      c10 = v[Y*Z*x0+Z*y1+z0]*(1-xd) + v[Y*Z*x1+Z*y1+z0]*xd 
      c11 = v[Y*Z*x0+Z*y1+z1]*(1-xd) + v[Y*Z*x1+Z*y1+z1]*xd 

      c0 = c00*(1-yd) + c10*yd 
      c1 = c01*(1-yd) + c11*yd 

      c = c0*(1-zd) + c1*zd 

     else: 
      c = 0 

     result[i] = c 

Результаты по-прежнему идентичны вашим. С помощью случайных данных сеточных 60x60x60 я получаю следующие тайминги:

SciPy's solution:    982ms 
Your cython solution:   24.7ms 
Above modified cython solution: 8.17ms 

Так что его почти в 4 раза быстрее, чем cython решения. Обратите внимание, что

  1. Cython выполняет проверку границ по умолчанию на массивах, и если вы хотите, что функция затем включите boundschecking удалить @boundscheck(False).
  2. В приведенном выше решении массивы должны быть C-смежный
  3. Если вы хотите параллельный вариант кода выше, замените range вместо prange в вашем for loop.

Надеюсь, это поможет.

+0

Спасибо, это действительно быстрее, но вычисляет неправильный результат (абсолютная разница между оригинальным решением/scipy интерполяция и эта версия слишком высока) и сбои на больших сетках (например, 200x200x20). – Nefarin

+0

@ Нефарин В коде было несколько опечаток. Мне пришлось изменить '+' на '*', и теперь результаты идентичны вашим. Как я уже упоминал ранее, убедитесь, что массивы, которые вы передаете этой функции, являются 'C-смежными'. – romeric

+0

Спасибо, сейчас работает, и после выпуска gil и использования prange это еще быстрее. – Nefarin

0

Я использовал Cython для ускорения этого и реализован следующий код:

import numpy as np 
cimport numpy as np 
np.import_array() 
from libc.math cimport ceil, floor 

DTYPE = np.float 
ctypedef np.float_t DTYPE_t 

def interp3(np.ndarray[DTYPE_t, ndim=3] x_grid, np.ndarray[DTYPE_t, ndim=3] y_grid, 
    np.ndarray[DTYPE_t, ndim=3] z_grid, np.ndarray[DTYPE_t, ndim=3] v, 
    np.ndarray[DTYPE_t, ndim=3] xs, np.ndarray[DTYPE_t, ndim=3] ys, 
    np.ndarray[DTYPE_t, ndim=3] zs): 

    cdef int i 
    cdef float x 
    cdef float y 
    cdef float z 
    cdef int x0 
    cdef int x1 
    cdef int y0 
    cdef int y1 
    cdef int z0 
    cdef int z1 
    cdef float xd 
    cdef float yd 
    cdef float zd 
    cdef float c00 
    cdef float c01 
    cdef float c10 
    cdef float c11 
    cdef float c0 
    cdef float c1 
    cdef float c 
    cdef int X 
    cdef int Y 
    cdef int Z 

    X, Y, Z = np.shape(x_grid) 

    cdef np.ndarray[DTYPE_t, ndim=1] x_points = np.ravel(xs) 
    cdef np.ndarray[DTYPE_t, ndim=1] y_points = np.ravel(ys) 
    cdef np.ndarray[DTYPE_t, ndim=1] z_points = np.ravel(zs) 
    cdef np.ndarray[DTYPE_t, ndim=1] result = np.empty((len(x_points)), dtype=DTYPE) 

    for i in range(len(x_points)): 
     x = x_points[i] 
     y = y_points[i] 
     z = z_points[i] 

     x0 = int(floor(x)) 
     x1 = x0 + 1 
     y0 = int(floor(y)) 
     y1 = y0 + 1 
     z0 = int(floor(z)) 
     z1 = z0 + 1 

     xd = (x-x0)/(x1-x0) 
     yd = (y-y0)/(y1-y0) 
     zd = (z-z0)/(z1-z0) 

     try: 
      assert x0 >= 0 and y0 >= 0 and z0 >= 0 
      c00 = v[x0, y0, z0]*(1-xd) + v[x1, y0, z0]*xd 
      c01 = v[x0, y0, z1]*(1-xd) + v[x1, y0, z1]*xd 
      c10 = v[x0, y1, z0]*(1-xd) + v[x1, y1, z0]*xd 
      c11 = v[x0, y1, z1]*(1-xd) + v[x1, y1, z1]*xd 

      c0 = c00*(1-yd) + c10*yd 
      c1 = c01*(1-yd) + c11*yd 

      c = c0*(1-zd) + c1*zd 
     except: 
      c = 0 

     result[i] = c 

    cdef np.ndarray[DTYPE_t, ndim=3] interpolated = np.zeros((X, Y, Z), dtype=DTYPE) 
    interpolated = np.reshape(result, (X, Y, Z)) 
    return interpolated 

Это мой первый раз с Cython, поэтому имеют следующие вопросы:

  1. Как я могу оптимизировать это дальше?

  2. Есть ли какой-либо простой способ избежать попыток и утверждать утверждения для проверки границ массива? Пытаясь обеспечить границы с мин/макс комбинаций медленнее, чем это пытаются/утверждать подход

В настоящее время она составляет около 8х быстрее, чем исходный код размещен выше.