2017-01-31 26 views
16

Может кто-нибудь помочь мне переписать эту функцию (функция doTheMath) выполнить вычисления на графическом процессоре? Я использовал несколько хороших дней, пытаясь окунуться в него, но никакого результата. Интересно, может быть, кто-нибудь может помочь мне переписать эту функцию так, как вам кажется, что это похоже на журнал, поскольку я даю тот же результат в конце. Я пытался использовать @jit от numba, но по какой-то причине он на самом деле намного медленнее, чем запуск кода, как обычно. С огромным размером выборки цель заключается в том, чтобы значительно сократить время выполнения, поэтому я считаю, что GPU - это самый быстрый способ сделать это.Python: переписать циклическую математическую функцию numpy для работы на GPU

Я немного объясню, что на самом деле происходит. Реальные данные, которые выглядят почти идентичными, как образцы данных, созданных в приведенном ниже коде, делятся на типовые размеры приблизительно 5.000.000 строк каждого образца или около 150 МБ на файл. Всего около 600.000.000 строк или 20 ГБ данных. Я должен процитировать эти данные, образец по образцу, а затем строку за строкой в ​​каждом примере, взять последние 2000 (или другие) строки по каждой строке и запустить функцию doTheMath, которая возвращает результат. Затем этот результат сохраняется на жестком диске, где я могу сделать с ним другую работу. Как вы можете видеть ниже, мне не нужны все результаты всех строк, только те, которые больше определенной суммы. Если я запустил свою функцию, как сейчас, на python, я получаю около 62 секунд на 1.000.000 строк. Это очень длительное время, учитывая все данные и как быстро это нужно делать.

Должен отметить, что я загружаю файл реальных данных по файлу в ОЗУ с помощью data = joblib.load(file), поэтому загрузка данных не является проблемой, так как занимает около 0,29 секунды на файл. После загрузки я запускаю весь код ниже. Наибольшее время занимает функция doTheMath. Я готов отдать все свои 500 очков репутации, которые у меня есть на stackoverflow, в награду за кого-то, кто хочет помочь мне переписать этот простой код для запуска на GPU. Я заинтересован в GPU, я действительно хочу посмотреть, как это делается по этой проблеме.

РЕДАКТИРОВАТЬ/ОБНОВЛЕНИЕ 1: Вот ссылка на небольшую выборку реальных данных: data_csv.zip О 102000 рядов реальной data1 и 2000 строк для реальной data2a и data2b. Используйте minimumLimit = 400 на данных реального образца

EDIT/UPDATE 2: Для тех, кто следит этот пост здесь краткое резюме ниже ответов. До сих пор у нас есть 4 ответа на исходное решение. Тот, который предлагает @Divakar, - это всего лишь хитрости к исходному коду. Из двух настроек только первая применима к этой проблеме, вторая - хорошая настройка, но здесь она не применяется. Из трех других ответов два из них - решения на основе процессоров, и один тензорный процессор GPU. Tensorflow-GPU от Paul Panzer кажется многообещающим, но когда я фактически запускаю его на GPU, он медленнее оригинала, поэтому код все еще нуждается в улучшении.

Другие два решения на базе процессоров представлены @PaulPanzer (чистое решение numpy) и @MSeifert (решение numba). Оба решения дают очень хорошие результаты и обе технологические данные чрезвычайно быстрые по сравнению с исходным кодом. Из них один, представленный Полом Панцером, быстрее. Он обрабатывает около 1.000.000 строк за 3 секунды. Единственная проблема заключается в меньших размерах пакета, это можно преодолеть либо переключением на решение numba, предлагаемое MSeifert, либо даже исходным кодом после всех настроек, которые обсуждались ниже.

Я очень доволен и благодарен @PaulPanzer и @MSeifert за то, что они сделали на их ответах. Тем не менее, поскольку это вопрос о решении на базе GPU, я ожидаю, если кто-то захочет попробовать его в версии GPU и посмотреть, насколько быстрее данные будут обрабатываться на графическом процессоре по сравнению с текущим процессором решения.Если не будет никаких других ответов превосходит показатели @ чистого раствора Numpy PaulPanzer в то я приму его ответ как правого и получает награду :)

EDIT/UPDATE 3: @Divakar опубликовал новый ответ с решение для графического процессора. После моих тестов на реальных данных скорость даже не сравнима с решениями для сопоставления процессоров. Процессор GPU обрабатывает около 5.000.000 за 1,5 секунды. Это невероятно :) Я очень взволнован решением GPU, и я благодарю @Divakar за его публикацию. Как и я благодарю @PaulPanzer и @MSeifert для их решения CPU :) Теперь мое исследование продолжается с невероятной скоростью из-за ГПУ :)

import pandas as pd 
import numpy as np 
import time 

def doTheMath(tmpData1, data2a, data2b): 
    A = tmpData1[:, 0] 
    B = tmpData1[:,1] 
    C = tmpData1[:,2] 
    D = tmpData1[:,3] 
    Bmax = B.max() 
    Cmin = C.min() 
    dif = (Bmax - Cmin) 
    abcd = ((((A - Cmin)/dif) + ((B - Cmin)/dif) + ((C - Cmin)/dif) + ((D - Cmin)/dif))/4) 
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 

#Declare variables 
batchSize = 2000 
sampleSize = 5000000 
resultArray = [] 
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data 
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) 
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit 
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit 
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b. 


#Loop through the data 
t0 = time.time() 
for rowNr in range(data1.shape[0]): 
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window 
    if(tmp_df.shape[0] == batchSize): 
     result = doTheMath(tmp_df, data2a, data2b) 
     if (result >= minimumLimit): 
      resultArray.append([rowNr , result]) 
print('Runtime:', time.time() - t0) 

#Save data results 
resultArray = np.array(resultArray) 
print(resultArray[:,1].sum()) 
resultArray = pd.DataFrame({'index':resultArray[:,0], 'result':resultArray[:,1]}) 
resultArray.to_csv("Result Array.csv", sep=';') 

ПК спецификации я работаю:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM; 
a SSD drive 
running Windows 7; 

Как побочный вопрос, поможет ли вторая видеоплата в SLI по этой проблеме?

+0

SLI не имеет значения и не имеет ничего общего с CUDA. Что касается того, как вы можете преобразовать этот код - вы делаете это, садясь перед своим компьютером и вводите новый код ядра CUDA на свой компьютер. И если вы хотите запустить его на двух графических процессорах, вы также вводите код API для управления запуском кода на двух графических процессорах. – talonmies

+0

Вы всегда можете попробовать [numba] (http://numba.pydata.org/), который может * попробовать * автоматически использовать CUDA в некоторой степени. Лучшим подходом было бы использовать графики вычислений Theano/Tensorflow и реализовать алгоритм в их рамках для его компиляции для графических процессоров. Но да, в общем, это о том, чтобы знать CUDA и разрабатывать для вас свой алгоритм, используя доступные инструменты, такие как упоминаемые вами когти. – sascha

+0

Благодарим вас за предложение @sascha. Я думал, что Theano и Tensorflow предназначены только для проблем машинного обучения. Я увижу в numba на данный момент – RaduS

ответ

5

Введение и код решения

Ну, вы просили об этом! Таким образом, перечисленные в этом сообщении - это реализация с PyCUDA, которая использует легкие обертки, расширяющие большинство возможностей CUDA в среде Python. Мы будем использовать его функциональность SourceModule, которая позволяет нам писать и компилировать ядра CUDA, находящиеся в среде Python.

Чтобы понять, в чем состоит проблема, у нас есть максимальные и минимальные скользящие максимальные и минимальные различия, различия и сравнения.Для максимальной и минимальной частей, которая включает в себя обнаружение максимального количества блоков (для каждого скользящего окна), мы будем использовать технику уменьшения, как описано более подробно here. Это будет сделано на уровне блоков. Для итераций верхнего уровня в раздвижных окнах мы будем использовать индексирование уровня сетки в ресурсы CUDA. Для получения дополнительной информации об этом блоке и формате сетки, пожалуйста, обратитесь к page-18. PyCUDA также поддерживает встроенные функции для вычисления сокращений, таких как max и min, но мы теряем контроль, особенно мы намерены использовать специализированную память, такую ​​как общая и постоянная память, для использования GPU на оптимальном уровне.

Листинг кода решения PyCuda-NumPy -

1] PyCuda часть -

import pycuda.autoinit 
import pycuda.driver as drv 
import numpy as np 
from pycuda.compiler import SourceModule 

mod = SourceModule(""" 
#define TBP 1024 // THREADS_PER_BLOCK 

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset) 
{ 
    int tid = threadIdx.x; 
    int inv = TBP; 
    __shared__ float dS[TBP][2]; 

    dS[tid][0] = d1[tid+offset]; 
    dS[tid][1] = d2[tid+offset];   
    __syncthreads(); 

    if(tid<L-TBP) 
    { 
     dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]); 
     dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]); 
    } 
    __syncthreads(); 
    inv = inv/2; 

    while(inv!=0) 
    { 
     if(tid<inv) 
     { 
      dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]); 
      dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]); 
     } 
     __syncthreads(); 
     inv = inv/2; 
    } 
    __syncthreads(); 

    if(tid==0) 
    { 
     out[0] = dS[0][0]; 
     out[1] = dS[0][1]; 
    } 
    __syncthreads(); 
} 

__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN) 
{ 
    int L = BLOCKLEN[0]; 
    int tid = threadIdx.x; 
    int iterID = blockIdx.x; 
    float Bmax_Cmin[2]; 
    int inv; 
    float Cmin, dif; 
    __shared__ float dS[TBP*2]; 

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID); 
    Cmin = Bmax_Cmin[1]; 
    dif = (Bmax_Cmin[0] - Cmin); 

    inv = TBP; 

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin)/(4.0*dif); 
    __syncthreads(); 

    if(tid<L-TBP) 
     dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin)/(4.0*dif);     

    dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0; 
    __syncthreads(); 

    if(tid<L-TBP) 
     dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0; 
    __syncthreads(); 

    inv = inv/2; 
    while(inv!=0) 
    { 
     if(tid<inv) 
      dS[tid] += dS[tid+inv]; 
     __syncthreads(); 
     inv = inv/2; 
    } 

    if(tid==0) 
     out[iterID] = dS[0]; 
    __syncthreads(); 

} 
""") 

Пожалуйста, обратите внимание, что THREADS_PER_BLOCK, TBP должен быть установлен на основе batchSize. Эмпирическое правило состоит в том, чтобы присвоить значение 2 значения TBP, которое меньше, чем batchSize. Таким образом, для batchSize = 2000 нам понадобилось TBP как 1024.

2] NumPy часть -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit): 
    func1 = mod.get_function("main1") 
    outlen = len(A)-batchSize+1 

    # Set block and grid sizes 
    BSZ = (1024,1,1) 
    GSZ = (outlen,1) 

    dest = np.zeros(outlen).astype(np.float32) 
    N = np.int32(batchSize) 
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \ 
        drv.In(data2b), drv.In(data2a),\ 
        drv.In(N), block=BSZ, grid=GSZ) 
    idx = np.flatnonzero(dest >= minimumLimit) 
    return idx, dest[idx] 

Бенчмаркинг

я испытал на GTX 960m. Обратите внимание, что PyCUDA ожидает, что массивы будут непрерывного порядка. Итак, нам нужно обрезать столбцы и делать копии. Я ожидаю/предполагаю, что данные могут быть прочитаны из файлов, так что данные распределяются по строкам, а не как столбцы. Таким образом, пока мы не будем отслеживать эту функцию.

Оригинальный подход -

def org_app(data1, batchSize, minimumLimit): 
    resultArray = [] 
    for rowNr in range(data1.shape[0]-batchSize+1): 
     tmp_df = data1[rowNr:rowNr + batchSize] #rolling window 
     result = doTheMath(tmp_df, data2a, data2b) 
     if (result >= minimumLimit): 
      resultArray.append([rowNr , result]) 
    return resultArray 

таймингов и верификация -

In [2]: #Declare variables 
    ...: batchSize = 2000 
    ...: sampleSize = 50000 
    ...: resultArray = [] 
    ...: minimumLimit = 490 #use 400 on the real sample data 
    ...: 
    ...: #Create Random Sample Data 
    ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32) 
    ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32) 
    ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32) 
    ...: 
    ...: # Make column copies 
    ...: A = data1[:,0].copy() 
    ...: B = data1[:,1].copy() 
    ...: C = data1[:,2].copy() 
    ...: D = data1[:,3].copy() 
    ...: 
    ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit) 
    ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T 
    ...: print(np.allclose(gpu_out1, cpu_out1)) 
    ...: print(np.allclose(gpu_out2, cpu_out2)) 
    ...: 
True 
False 

Итак, есть некоторые различия между CPU и GPU подсчетами. Исследуем их -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2)) 

In [8]: idx 
Out[8]: array([12776, 15208, 17620, 18326]) 

In [9]: gpu_out2[idx] - cpu_out2[idx] 
Out[9]: array([-1., -1., 1., 1.]) 

Существует четыре случая несоответствия. Они отключены при максимальном значении 1. После исследования я наткнулся на некоторые сведения об этом. В принципе, поскольку мы используем математические функции для вычислений max и min, и те, которые, я думаю, вызывают, что последний двоичный бит в плавающем представлении pt отличается от аналога процессора. Это называется ошибкой ULP и подробно обсуждается here и here.

Наконец, кладя вопрос в сторону, давайте перейдем к самому важному биту, производительность -

In [10]: %timeit org_app(data1, batchSize, minimumLimit) 
1 loops, best of 3: 2.18 s per loop 

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit) 
10 loops, best of 3: 82.5 ms per loop 

In [12]: 2180.0/82.5 
Out[12]: 26.424242424242426 

Давайте попробуем с большими наборами данных. С sampleSize = 500000, мы получаем -

In [14]: %timeit org_app(data1, batchSize, minimumLimit) 
1 loops, best of 3: 23.2 s per loop 

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit) 
1 loops, best of 3: 821 ms per loop 

In [16]: 23200.0/821 
Out[16]: 28.25822168087698 

Таким образом, увеличение скорости остается постоянным на уровне около 27.

Ограничения:

1) Мы используем float32 числа, а графические процессоры работают лучше всего с теми. Двойная точность, особенно на несерверных графических процессорах, не очень популярна, когда дело доходит до производительности, и поскольку вы работаете с таким GPU, я тестировал с помощью float32.

Дальнейшее усовершенствование:

1) Мы могли бы использовать более constant memory кормить в data2a и data2b, а не использовать global memory.

+0

в конечном итоге решение для графического процессора :) спасибо @ Дивакар. Я буду проверять его на реальных данных позже и вернуться с результатами. – RaduS

+1

@RaduS Обязательно ознакомьтесь с отредактированными кодами (только что отредактированными) для бенчмаркинга! Теперь он принимает любой произвольный 'batchSize'. – Divakar

+0

Привет @Divakar, я провел несколько интенсивных тестов по множеству реальных данных со сценарием. И скорость действительно впечатляет. Но результаты ошибочны. Выход слишком велик для любых данных. Массив результатов должен содержать только строки, размер которых больше минимального. Вот ссылка [link] (https://mab.to/VHZbzwW8I), где я загрузил образец 102000 строк реальных данных, а также ваш скрипт, запускающий его. Исходный сценарий, который вы опубликовали в своем решении, также дает неверный результат, я думаю, это потому, что вы меняете data2a для соответствия функции графического процессора, но это не тот же вход для функции CPU. – RaduS

9

Tweak # 1

Его обычно советуют векторизации вещи при работе с NumPy массивами. Но с очень большими массивами, я думаю, что у вас нет вариантов. Таким образом, чтобы повысить производительность, можно сделать небольшую настройку на последнем этапе суммирования.

Мы могли бы заменить шаг, который делает массив 1s и 0s и не суммирующий:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 

с np.count_nonzero, который работает эффективно рассчитывать True значения в булевой массиве, вместо преобразования в 1s и 0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b)) 

тест Продолжительность -

In [45]: abcd = np.random.randint(11,99,(10000)) 

In [46]: data2a = np.random.randint(11,99,(10000)) 

In [47]: data2b = np.random.randint(11,99,(10000)) 

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 
10000 loops, best of 3: 81.8 µs per loop 

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b)) 
10000 loops, best of 3: 28.8 µs per loop 

Tweak # 2

Используйте предварительно вычисленных взаимными при рассмотрении дел, которые проходят неявное вещание. Дополнительная информация here. Таким образом, магазин обратна dif и использовать вместо на шаге:

((((A - Cmin)/dif) + ((B - Cmin)/dif) + ... 

Образец тест -

In [52]: A = np.random.rand(10000) 

In [53]: dif = 0.5 

In [54]: %timeit A/dif 
10000 loops, best of 3: 25.8 µs per loop 

In [55]: %timeit A*(1.0/dif) 
100000 loops, best of 3: 7.94 µs per loop 

У вас есть четыре места, используя деление на dif. Так что, надеюсь, это вызовет заметный импульс!

+0

привет @ Дивакар, касающийся tweak # 2, я прочитал сообщение, в которое вы связались, и попытался его реализовать. но это швы, что я не получаю тот же результат. возможно, я делаю что-то неправильно. Можете ли вы взглянуть на него? возможно, вам легче обнаружить ошибку 'dif = 1.0/(Bmax - Cmin)', а затем 'abcd = ((dif * A) + ((dif * B) + (dif * C) + (dif * D))/4) ' – RaduS

+0

@RaduS Ну, если' Bmax' и 'Cmin' близки, то« Bmax-Cmin »будет небольшим числом, и его обратное будет большим числом. Итак, позже, когда массивы будут умножены на это число, мы будем иметь разные числа. Итак, мы, возможно, пропустим эту настройку. – Divakar

2

Это технически не по теме (не GPU), но я уверен, что вас это заинтересует.

Существует один очевидный и довольно большая экономия:

предвычисления A + B + C + D (не в цикле, по всей данных: data1.sum(axis=-1)), потому что abcd = ((A+B+C+D) - 4Cmin)/(4dif). Это сэкономит немало операционных инструментов.

Удивлены никто не заметил, что один, прежде чем ;-)

Edit:

Существует еще одна вещь, хотя я подозреваю, что это только в вашем примере, а не в реальных данных:

Как он стоит примерно половина data2a будет меньше data2b. В этих местах ваши условия на abcd не могут быть истинными, поэтому вам не нужно даже вычислять abcd.

Edit:

еще один твик я ниже, но забыл упомянуть: Если вычислить максимальную (или мин) над движущимся окном. Когда вы перемещаете одну точку вправо, скажите, насколько вероятен максимум для изменения? Есть только две вещи, которые могут ее изменить: новая точка справа больше (происходит примерно один раз в период времени окна, и даже если это происходит, вы сразу же знаете новый максимум), или старый максимум падает с окна на left (также бывает примерно раз в windowlength times). Только в этом последнем случае вам нужно искать все окно для нового max.

Edit:

не удержался дать ему попробовать в tensorflow. У меня нет GPU, поэтому вы сами должны проверить его на скорость. Поместите «gpu» для «cpu» на отмеченную строку.

На процессоре он примерно в два раза быстрее, чем исходная реализация (т. Е. Без настроек Дивакара). Обратите внимание, что я позволил изменить входы от матрицы к простому массиву. В настоящее время tensorflow - это немного движущаяся цель, поэтому убедитесь, что у вас есть правильная версия. Я использовал Python3.6 и tf 0.12.1. Если вы делаете pip3 install tensorflow-gpu, то сегодня должен может работать.

import numpy as np 
import time 
import tensorflow as tf 

# currently the max/min code is sequential 
# thus 
parallel_iterations = 1 
# but you can put this in a separate loop, precompute and then try and run 
# the remainder of doTheMathTF with a larger parallel_iterations 

# tensorflow is quite capricious about its data types 
ddf = tf.float64 
ddi = tf.int32 

def worker(data1, data2a, data2b): 
    ################################### 
    # CHANGE cpu to gpu in next line! # 
    ################################### 
    with tf.device('/cpu:0'): 
     g = tf.Graph() 
     with g.as_default(): 
      ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf) 
      B = tf.constant(data1[:, 1], dtype=ddf) 
      C = tf.constant(data1[:, 2], dtype=ddf) 
      window = tf.constant(len(data2a)) 
      N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi) 
      data2a = tf.constant(data2a, dtype=ddf) 
      data2b = tf.constant(data2b, dtype=ddf) 
      def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out): 
       # most of the time we can keep the old max/min 
       Bmaxind = tf.cond(Bmaxind<i, 
            lambda: i + tf.to_int32(
             tf.argmax(B[i:i+window], axis=0)), 
            lambda: tf.cond(Bmax>B[i+window-1], 
                lambda: Bmaxind, 
                lambda: i+window-1)) 
       Cminind = tf.cond(Cminind<i, 
            lambda: i + tf.to_int32(
             tf.argmin(C[i:i+window], axis=0)), 
            lambda: tf.cond(Cmin<C[i+window-1], 
                lambda: Cminind, 
                lambda: i+window-1)) 
       Bmax = B[Bmaxind] 
       Cmin = C[Cminind] 
       abcd = (ABCD[i:i+window] - 4 * Cmin) * (1/(4 * (Bmax-Cmin))) 
       out = out.write(i, tf.to_int32(
        tf.count_nonzero(tf.logical_and(abcd <= data2a, 
                abcd >= data2b)))) 
       return i + 1, Bmax, Bmaxind, Cmin, Cminind, out 
      with tf.Session(graph=g) as sess: 
       i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
        lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF, 
        (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf), 
        tf.Variable(-1, dtype=ddi), 
        tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi), 
        tf.TensorArray(ddi, size=N)), 
        shape_invariants=None, 
        parallel_iterations=parallel_iterations, 
        back_prop=False) 
       out = out.pack() 
       sess.run(tf.initialize_all_variables()) 
       out, = sess.run((out,)) 
    return out 

#Declare variables 
batchSize = 2000 
sampleSize = 50000#00 
resultArray = [] 

#Create Sample Data 
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4)) 
data2a = np.random.uniform(0, 1, (batchSize,)) 
data2b = np.random.uniform(0, 1, (batchSize,)) 

t0 = time.time() 
out = worker(data1, data2a, data2b) 
print('Runtime (tensorflow):', time.time() - t0) 


good_indices, = np.where(out >= 490) 
res_tf = np.c_[good_indices, out[good_indices]] 

def doTheMath(tmpData1, data2a, data2b): 
    A = tmpData1[:, 0] 
    B = tmpData1[:,1] 
    C = tmpData1[:,2] 
    D = tmpData1[:,3] 
    Bmax = B.max() 
    Cmin = C.min() 
    dif = (Bmax - Cmin) 
    abcd = ((((A - Cmin)/dif) + ((B - Cmin)/dif) + ((C - Cmin)/dif) + ((D - Cmin)/dif))/4) 
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 

#Loop through the data 
t0 = time.time() 
for rowNr in range(sampleSize+1): 
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window 
    result = doTheMath(tmp_df, data2a, data2b) 
    if (result >= 490): 
     resultArray.append([rowNr , result]) 
print('Runtime (original):', time.time() - t0) 
print(np.alltrue(np.array(resultArray)==res_tf)) 
+0

Благодарю вас за ответ Пол. Я проверил код на двух отдельных компьютерах как с установленной Windows, так и с Python3.5 и tf 0.12.1. По какой-то причине версия tensorflow медленнее оригинала, даже если я активирую GPU, она все еще медленнее оригинала. Вот некоторые статистические данные: Pc1 не имеет установленного графического процессора: 'Runtime (tensorflow): 9.321721315383911 Runtime (оригинал): 3.7472479343414307 True' Pc2 с установленным и активированным GPU:' Runtime (tensorflow): 72.36511659622192 Время выполнения (оригинал): 3.5680108070373535 True' – RaduS

+0

I также получите предупреждение «ПРЕДУПРЕЖДЕНИЕ: Тензорный поток: От C: /testings.py: 61 у рабочего: initialize_all_variables (from tensorflow.python.ops.variables) устарел и будет удален после 2017-03-02. Инструкции по обновлению: вместо этого используйте tf.global_variables_initializer. '' – RaduS

+0

Это был всего лишь тест на код, который вы отправили без каких-либо изменений данных или размера выборки. Это может быть медленнее, потому что это Windows? или потому что у меня есть python 3.5, а не 3.6? Или есть еще одна причина? – RaduS

4

Перед началом настройки цели (GPU) или использовать что-нибудь еще (то есть параллельные казни), вы можете рассмотреть вопрос о том, как улучшить уже существующий код. Вы использовали -tag, поэтому я буду использовать его, чтобы улучшить код: Сначала мы работаем над массивами не на матрицах:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) 
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit 
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit 

Каждый раз, когда вы звоните doTheMath вы ожидаете целое назад, однако вы используете много массивы и создать много промежуточных массивов:

abcd = ((((A - Cmin)/dif) + ((B - Cmin)/dif) + ((C - Cmin)/dif) + ((D - Cmin)/dif))/4) 
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 

Это создает промежуточный массив каждый шаг:

  • tmp1 = A-Cmin,
  • tmp2 = tmp1/dif,
  • tmp3 = B - Cmin,
  • tmp4 = tmp3/dif
  • ... вы получите суть.

Однако это функция уменьшения (array -> integer), поэтому наличие большого количества промежуточных массивов - лишний вес, просто вычислите значение «fly».

import numba as nb 

@nb.njit 
def doTheMathNumba(tmpData, data2a, data2b): 
    Bmax = np.max(tmpData[:, 1]) 
    Cmin = np.min(tmpData[:, 2]) 
    diff = (Bmax - Cmin) 
    idiff = 1/diff 
    sum_ = 0 
    for i in range(tmpData.shape[0]): 
     val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3])/4 * idiff - Cmin * idiff 
     if val <= data2a[i] and val >= data2b[i]: 
      sum_ += 1 
    return sum_ 

я сделал что-то еще здесь, чтобы избежать нескольких операций:

(((A - Cmin)/dif) + ((B - Cmin)/dif) + ((C - Cmin)/dif) + ((D - Cmin)/dif))/4 
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin)/dif)/4 
= (A + B + C + D - 4 * Cmin)/(4 * dif) 
= (A + B + C + D)/(4 * dif) - (Cmin/dif) 

Это фактически сокращает время выполнения почти в 10 раз на моем компьютере:

%timeit doTheMath(tmp_df, data2a, data2b)  # 1000 loops, best of 3: 446 µs per loop 
%timeit doTheMathNumba(tmp_df, data2a, data2b) # 10000 loops, best of 3: 59 µs per loop 

Есть конечно же, и другие улучшения, например, используя min/max для расчета Bmax и Cmin, что сделало бы, по меньшей мере, часть расчета ru n в O(sampleSize) вместо O(samplesize * batchsize). Это также позволило бы повторно использовать некоторые из расчетов (A + B + C + D)/(4 * dif) - (Cmin/dif), потому что, если Cmin и Bmax не изменяются для следующего образца, эти значения не отличаются. Это немного сложно сделать, потому что сравнения различаются. Но определенно возможно! Смотрите здесь:

import time 
import numpy as np 
import numba as nb 

@nb.njit 
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin): 
    diff = (Bmax - Cmin) 
    idiff = 1/diff 
    quarter_idiff = 0.25 * idiff 
    sum_ = 0 
    for i in range(abcd.shape[0]): 
     val = abcd[i] * quarter_idiff - Cmin * idiff 
     if val <= data2a[i] and val >= data2b[i]: 
      sum_ += 1 
    return sum_ 

@nb.njit 
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray): 
    found = 0 
    for rowNr in range(data1.shape[0]): 
     if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize): 
      result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
            data2a, data2b, Bmaxs[rowNr], Cmins[rowNr]) 
      if (result >= minimumLimit): 
       resultArray[found, 0] = rowNr 
       resultArray[found, 1] = result 
       found += 1 
    return resultArray[:found] 

#Declare variables 
batchSize = 2000 
sampleSize = 50000 
resultArray = [] 
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) 
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit 
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit 

from scipy import ndimage 
t0 = time.time() 
abcd = np.sum(data1, axis=1) 
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
           size=batchSize, 
           origin=-((batchSize-1)//2-1)) # correction for even shapes 
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
           size=batchSize, 
           origin=-((batchSize-1)//2-1)) 

result = np.zeros((sampleSize, 2), dtype=np.int64) 
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result) 
print('Runtime:', time.time() - t0) 

Это дает мне Runtime: 0.759593152999878 (после Numba скомпилирована функция!), В то время как оригинал Тука было Runtime: 24.68975639343262. Теперь мы в 30 раз быстрее!

С вашим размером выборки по-прежнему требуется Runtime: 60.187848806381226, но это не так уж плохо, не так ли?

И даже если я этого не сделал сам, говорит, что можно написать "Numba for CUDA GPUs", и это не кажется сложным.

+0

«Это также позволило бы повторно использовать некоторые из (A + B + C + D)/(4 * dif) - (Cmin/dif) вычислений, потому что, если Cmin и Bmax не изменение для следующего образца эти значения не отличаются. Это немного сложно ... «Сделано именно это, опубликует через несколько минут. Это фаст, и я использую чистый numpy. –

+0

ОК, мне нужно исправить мой предыдущий оператор, потому что я сделал что-то не так, это всего лишь в 30 раз быстрее :( – MSeifert

+0

@PaulPanzer Да, можно реализовать все эти функции снова (вместо использования scipy-фильтров), но я думаю, что вы numpy код довольно свернут, а на моем компьютере также медленнее (не так много, но почти в 2 раза медленнее). Поэтому я не думаю, что это преимущество «использовать чистый numpy» здесь. Кроме того: Numba может фактически скомпилировать код для графических процессоров, Я сделал это сам. :) – MSeifert

4

Вот некоторый код, чтобы продемонстрировать, что можно, просто подстраивая алгоритм. Это чистый NumPy, но на данных выборки вы публикуемую дает примерно 35x прирост скорости по сравнению с оригинальной версией (~ 1000000 сэмплы ~ 2.5sec на моей довольно скромной машине):

>>> result_dict = master('run') 
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)] 
TOTAL 2.44821691513 

Tweaks используется:

  • A + B + C + D, см. Мой другой ответ
  • работает мин/макс, в том числе избегая вычислить (A + B + C + D - 4Cmin)/(4dif) несколько раз с тем же Cmin/dif.

Это более или менее обычное дело. Это оставляет сравнение с data2a/b, которое является дорогостоящим O (NK), где N - количество выборок, а K - размер окна. Здесь можно воспользоваться относительно хорошо выполненными данными. Используя min/max, можно создать варианты data2a/b, которые могут быть использованы для одновременного тестирования диапазона сдвигов окон, если тест не удался, все эти смещения могут быть немедленно исключены, в противном случае диапазон делит пополам.

import numpy as np 
import time 

# global variables; they will hold the precomputed pre-screening filters 
preA, preB = {}, {} 
CHUNK_SIZES = None 

def sliding_argmax(data, K=2000): 
    """compute the argmax of data over a sliding window of width K 

    returns: 
     indices -- indices into data 
     switches -- window offsets at which the maximum changes 
        (strictly speaking: where the index of the maximum changes) 
        excludes 0 but includes maximum offset (len(data)-K+1) 

    see last line of compute_pre_screening_filter for a recipe to convert 
    this representation to the vector of maxima 
    """ 
    N = len(data) 
    last = np.argmax(data[:K]) 
    indices = [last] 
    while indices[-1] <= N - 1: 
     ge = np.where(data[last + 1 : last + K + 1] > data[last])[0] 
     if len(ge) == 0: 
      if last + K >= N: 
       break 
      last += 1 + np.argmax(data[last + 1 : last + K + 1]) 
      indices.append(last) 
     else: 
      last += 1 + ge[0] 
      indices.append(last) 
    indices = np.array(indices) 
    switches = np.where(data[indices[1:]] > data[indices[:-1]], 
         indices[1:] + (1-K), indices[:-1] + 1) 
    return indices, np.r_[switches, [len(data)-K+1]] 


def compute_pre_screening_filter(bound, n_offs): 
    """compute pre-screening filter for point-wise upper bound 

    given a K-vector of upper bounds B and K+n_offs-1-vector data 
    compute K+n_offs-1-vector filter such that for each index j 
    if for any offset 0 <= o < n_offs and index 0 <= i < K such that 
    o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j 

    therefore the number of data points below filter is an upper bound for 
    the maximum number of points below bound in any K-window in data 
    """ 
    pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:]) 
    padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)] 
    indices, switches = sliding_argmax(padded, n_offs) 
    return padded[indices].repeat(np.diff(np.r_[[0], switches])) 


def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6): 
    """compute upper and lower pre-screening filters for data blocks of 
    sizes K+n_offs-1 where 
    n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk 

    the result is stored in global variables preA and preB 
    """ 
    global CHUNK_SIZES 

    CHUNK_SIZES = min_chnk * 2**np.arange(dyads) 
    preA[1] = upper 
    preB[1] = lower 
    for n in CHUNK_SIZES: 
     preA[n] = compute_pre_screening_filter(upper, n) 
     preB[n] = -compute_pre_screening_filter(-lower, n) 


def test_bounds(block, counts, threshold=400): 
    """test whether the windows fitting in the data block 'block' fall 
    within the bounds using pre-screening for efficient bulk rejection 

    array 'counts' will be overwritten with the counts of compliant samples 
    note that accurate counts will only be returned for above threshold 
    windows, because the analysis of bulk rejected windows is short-circuited 

    also note that bulk rejection only works for 'well behaved' data and 
    for example not on random numbers 
    """ 
    N = len(counts) 
    K = len(preA[1]) 
    r = N % CHUNK_SIZES[0] 
    # chop up N into as large as possible chunks with matching pre computed 
    # filters 
    # start with small and work upwards 
    counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) & 
            (block[l:l+K] >= preB[1])) 
        for l in range(r)] 

    def bisect(block, counts): 
     M = len(counts) 
     cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M])) 
     if cnts < threshold: 
      counts[:] = cnts 
      return 
     elif M == CHUNK_SIZES[0]: 
      counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) & 
              (block[l:l+K] >= preB[1])) 
         for l in range(M)] 
     else: 
      M //= 2 
      bisect(block[:-M], counts[:M]) 
      bisect(block[M:], counts[M:]) 

    N = N // CHUNK_SIZES[0] 
    for M in CHUNK_SIZES: 
     if N % 2: 
      bisect(block[r:r+M+K-1], counts[r:r+M]) 
      r += M 
     elif N == 0: 
      return 
     N //= 2 
    else: 
     for j in range(2*N): 
      bisect(block[r:r+M+K-1], counts[r:r+M]) 
      r += M 


def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6, 
      threshold=400): 
    samples, upper, lower = data 
    N, K = samples.shape[0], upper.shape[0] 
    times = [time.time()] 
    if use_pre_screening: 
     compute_all_pre_screening_filters(upper, lower, min_chnk, dyads) 
    times.append(time.time()) 
    # compute switching points of max and min for running normalisation 
    upper_inds, upper_swp = sliding_argmax(samples[:, 1], K) 
    lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K) 
    times.append(time.time()) 
    # sum columns 
    ABCD = samples.sum(axis=-1) 
    times.append(time.time()) 
    counts = np.empty((N-K+1,), dtype=int) 
    # main loop 
    # loop variables: 
    offs = 0 
    u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0] 
    l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0] 
    while True: 
     # check which is switching next, min(C) or max(B) 
     if u_swp > l_swp: 
      # greedily take the largest block possible such that dif and Cmin 
      # do not change 
      block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \ 
        * (0.25/(u_scale-l_scale)) 
      if use_pre_screening: 
       test_bounds(block, counts[offs:l_swp], threshold=threshold) 
      else: 
       counts[offs:l_swp] = [ 
        np.count_nonzero((block[l:l+K] <= upper) & 
            (block[l:l+K] >= lower)) 
        for l in range(l_swp - offs)] 
      # book keeping 
      l_ind += 1 
      offs = l_swp 
      l_swp = lower_swp[l_ind] 
      l_scale = samples[lower_inds[l_ind], 2] 
     else: 
      block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \ 
        * (0.25/(u_scale-l_scale)) 
      if use_pre_screening: 
       test_bounds(block, counts[offs:u_swp], threshold=threshold) 
      else: 
       counts[offs:u_swp] = [ 
        np.count_nonzero((block[l:l+K] <= upper) & 
            (block[l:l+K] >= lower)) 
        for l in range(u_swp - offs)] 
      u_ind += 1 
      if u_ind == len(upper_inds): 
       assert u_swp == N-K+1 
       break 
      offs = u_swp 
      u_swp = upper_swp[u_ind] 
      u_scale = samples[upper_inds[u_ind], 1] 
    times.append(time.time()) 
    return {'counts': counts, 'valid': np.where(counts >= 400)[0], 
      'timings': np.diff(times)} 


def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3, 
      min_chnk=None, dyads=None): 
    t = time.time() 
    if data in ('fake', 'load'): 
     data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1, 
          usecols=[1,2,3,4]) 
     data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1, 
          usecols=[1]) 
     data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1, 
          usecols=[1]) 
     if data == 'fake': 
      data1 = np.tile(data1, (10, 1)) 
     threshold = 400 
    elif data == 'random': 
     data1 = np.random.random((102000, 4)) 
     data2b = np.random.random(2000) 
     data2a = np.random.random(2000) 
     threshold = 490 
     if use_pre_screening or mode == 'calibrate': 
      print('WARNING: pre-screening not efficient on artificial data') 
    else: 
     raise ValueError("data mode {} not recognised".format(data)) 
    data = data1, data2a, data2b 
    t_load = time.time() - t 
    if mode == 'calibrate': 
     min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk 
     dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads 
     timings = np.zeros((len(min_chnk), len(dyads))) 
     print('max bisect ' + ' '.join([ 
      ' n.a.' if dy == 0 else '{:7d}'.format(dy) for dy in dyads]), 
       end='') 
     for i, mc in enumerate(min_chnk): 
      print('\nmin chunk {}'.format(mc), end=' ') 
      for j, dy in enumerate(dyads): 
       for k in range(nrep): 
        if dy == 0: # no pre-screening 
         timings[i, j] += analyse(
          data, False, mc, dy, threshold)['timings'][3] 
        else: 
         timings[i, j] += analyse(
          data, True, mc, dy, threshold)['timings'][3] 
       timings[i, j] /= nrep 
       print('{:7.3f}'.format(timings[i, j]), end=' ', flush=True) 
     best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()), 
              timings.shape) 
     print('\nbest', min_chnk[best_mc], dyads[best_dy]) 
     return timings, min_chnk[best_mc], dyads[best_dy] 
    if mode == 'run': 
     min_chnk = 2 if min_chnk is None else min_chnk 
     dyads = 5 if dyads is None else dyads 
     res = analyse(data, use_pre_screening, min_chnk, dyads, threshold) 
     times = np.r_[[t_load], res['timings']] 
     print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times))) 
     print('TOTAL', times.sum()) 
     return res 
+0

Вау, который действительно впечатляет, мне нравится ваш подход. Я вижу, что res_indices возвращает список всех индексов, где он выше порогового значения. Как получить также номер результата для каждого индекса в том же массиве после того, где? – RaduS

+1

Вы можете использовать res_indices непосредственно на out' out [res_indices] 'дает вам количество точек, удовлетворяющих вашим критериям, на каждом смещении, где это число было 400 или более. Не могли бы вы проверить скрипт на некоторых ваших данных? Я подделал его к образцу, который вы разместили, но мне было бы интересно узнать, хорошо ли он работает и на других образцах. –

+0

Я протестировал сейчас довольно много вашего сценария на реальных данных, и скорость потрясающая, а точность на 100% правильная. Я получаю около 3 секунд на 1 мили. Это действительно впечатляет, учитывая тот факт, что он работает только на процессоре. Я очень доволен результатами сценария, хотя мне сложно понять все это :) – RaduS

 Смежные вопросы

  • Нет связанных вопросов^_^