Python: переписать математическую функцию numping numpy для работы на графическом процессоре

Может ли кто-нибудь помочь мне переписать эту функцию (функцию doTheMath) для выполнения вычислений на графическом процессоре? Я использовал несколько хороших дней, пытаясь окунуться в него, но никакого результата. Интересно, может быть, кто-нибудь может помочь мне переписать эту функцию так, как вам кажется, что это похоже на журнал, поскольку я даю тот же результат в конце. Я попытался использовать @jit из numba, но по какой-то причине он на самом деле намного медленнее, чем запуск кода, как обычно. С огромным размером выборки цель заключается в том, чтобы значительно сократить время выполнения, поэтому я считаю, что GPU - это самый быстрый способ сделать это.

Я немного объясню, что на самом деле происходит. Реальные данные, которые выглядят почти идентичными, как образцы данных, созданных в приведенном ниже коде, делятся на типовые размеры приблизительно 5.000.000 строк каждого образца или около 150 МБ на файл. Всего около 600.000.000 строк или 20 ГБ данных. Я должен прокручивать эти данные, образец по образцу, а затем строку за строкой в ​​каждом примере, брать последние 2000 (или другие) строки по каждой строке и запускать функцию doTheMath, которая возвращает результат. Затем этот результат сохраняется на жестком диске, где я могу сделать с ним другую работу. Как вы можете видеть ниже, мне не нужны все результаты всех строк, только те, которые больше определенной суммы. Если я запустил свою функцию, как сейчас, на python, я получаю около 62 секунд на 1.000.000 строк. Это очень долгое время, учитывая все данные и как быстро это сделать.

Я должен упомянуть, что я загружаю файл реальных данных по файлу в ОЗУ с помощью data = joblib.load(file), поэтому загрузка данных не является проблемой, так как занимает всего около 0,29 секунды на файл. После загрузки я запускаю весь код ниже. Наибольшее время занимает функция doTheMath. Я готов отдать все свои 500 очков репутации, которые у меня есть на stackoverflow, в награду за кого-то, кто хочет помочь мне переписать этот простой код для запуска на GPU. Я заинтересован в GPU, я действительно хочу посмотреть, как это делается по этой проблеме.

EDIT/UPDATE 1: Вот ссылка на небольшой образец реальных данных: data_csv.zip Около 102000 строк реальных данных1 и 2000 строк для реальных данных2a и data2b, Используйте minimumLimit = 400 для данных реальных образцов

EDIT/UPDATE 2: Ниже приведено краткое изложение ответов ниже. До сих пор у нас есть 4 ответа на исходное решение. Тот, который предлагает @Divakar, - это всего лишь хитрости к исходному коду. Из двух настроек только первая применима к этой проблеме, вторая - хорошая настройка, но здесь она не применяется. Из трех других ответов два из них - решения на основе процессоров, и один тензорный процессор GPU. Графический процессор Tensorflow от Paul Panzer кажется многообещающим, но когда я фактически запускаю его на графическом процессоре, он медленнее оригинала, поэтому код все еще нуждается в улучшении.

Другие два решения на базе процессора представлены @PaulPanzer (чистое решение numpy) и @MSeifert (решение numba). Оба решения дают очень хорошие результаты и обе технологические данные чрезвычайно быстрые по сравнению с исходным кодом. Из них один, представленный Полом Панцером, быстрее. Он обрабатывает около 1.000.000 строк за 3 секунды. Единственная проблема заключается в меньших размерах пакета, это можно преодолеть либо переключением на решение numba, предлагаемым MSeifert, либо даже исходным кодом после всех настроек, которые обсуждались ниже.

Я очень рад и благодарен @PaulPanzer и @MSeifert за работу, которую они сделали в своих ответах. Тем не менее, поскольку это вопрос о решении на базе GPU, я ожидаю, если кто-то захочет попробовать его в версии GPU и посмотреть, насколько быстрее данные будут обрабатываться на графическом процессоре по сравнению с текущим процессором решения. Если не будет других ответов, превосходящих чисто чистое решение @PaulPanzer, я приму его ответ как правильный и получаю награду:)

EDIT/UPDATE 3: @Divakar опубликовал новый ответ с решением для GPU. После моих тестов на реальных данных скорость даже не сравнима с решениями для сопоставления процессоров. Процессор GPU обрабатывает около 5.000.000 за 1,5 секунды. Это невероятно:) Я очень взволнован решением GPU, и я благодарю @Divakar за его публикацию. Также я благодарю @PaulPanzer и @MSeifert за их решения для процессоров:) Теперь мои исследования продолжаются с невероятной скоростью благодаря GPU:)

import pandas as pd
import numpy as np
import time

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B = tmpData1[:,1]
    C = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.


#Loop through the data
t0 = time.time()
for rowNr in  range(data1.shape[0]):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    if(tmp_df.shape[0] == batchSize):
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result])
print('Runtime:', time.time() - t0)

#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame({'index':resultArray[:,0], 'result':resultArray[:,1]})
resultArray.to_csv("Result Array.csv", sep=';')

Спецификации ПК, над которыми я работаю:

GTX970(4gb) video card; 
i7-4790K CPU 4.00Ghz; 
16GB RAM;
a SSD drive 
running Windows 7; 

Как побочный вопрос, поможет ли вторая видеоплата в SLI по этой проблеме?

Ответы

Ответ 1

Введение и код решения

Ну, ты просил об этом! Таким образом, перечисленные в этом сообщении - это реализация с PyCUDA, в которой используются облегченные обертки, расширяющие большинство возможностей CUDA в пределах Python. Мы будем использовать SourceModule функциональность, которая позволяет писать и компилировать ядра CUDA, находящиеся в среде Python.

Попадая в проблему, среди рассматриваемых вычислений мы имеем скользящий максимум и минимум, немного различий, разделов и сравнений. Для максимальной и минимальной частей, которая включает в себя обнаружение максимального количества блоков (для каждого скользящего окна), мы будем использовать технику уменьшения, как обсуждалось более подробно here. Это будет сделано на уровне блоков. Для итераций верхнего уровня в раздвижных окнах мы будем использовать индексирование уровня сетки в ресурсы CUDA. Для получения дополнительной информации об этом блоке и формате сетки см. page-18. PyCUDA также поддерживает встроенные функции для вычисления сокращений, таких как max и min, но мы теряем контроль, особенно мы намерены использовать специализированную память, такую ​​как общая и постоянная память, для использования GPU на оптимальном уровне.

Вывод кода решения PyCUDA-NumPy -

1] часть PyCUDA -

import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule

mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK

__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)
{
    int tid = threadIdx.x;
    int inv = TBP;
    __shared__ float dS[TBP][2];

    dS[tid][0] = d1[tid+offset];  
    dS[tid][1] = d2[tid+offset];         
    __syncthreads();

    if(tid<L-TBP)  
    {
        dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
        dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
    }
    __syncthreads();
    inv = inv/2;

    while(inv!=0)   
    {
        if(tid<inv)
        {
            dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
            dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
        }
        __syncthreads();
        inv = inv/2;
    }
    __syncthreads();

    if(tid==0)
    {
        out[0] = dS[0][0];
        out[1] = dS[0][1];
    }   
    __syncthreads();
}

__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)
{
    int L = BLOCKLEN[0];
    int tid = threadIdx.x;
    int iterID = blockIdx.x;
    float Bmax_Cmin[2];
    int inv;
    float Cmin, dif;   
    __shared__ float dS[TBP*2];   

    get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);  
    Cmin = Bmax_Cmin[1];
    dif = (Bmax_Cmin[0] - Cmin);

    inv = TBP;

    dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
    __syncthreads();

    if(tid<L-TBP)  
        dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);                   

     dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
     __syncthreads();

     if(tid<L-TBP)
         dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
     __syncthreads();

    inv = inv/2;
    while(inv!=0)   
    {
        if(tid<inv)
            dS[tid] += dS[tid+inv];
        __syncthreads();
        inv = inv/2;
    }

    if(tid==0)
        out[iterID] = dS[0];
    __syncthreads();

}
""")

Обратите внимание, что THREADS_PER_BLOCK, TBP должен быть установлен на основе batchSize. Эмпирическое правило состоит в том, чтобы присвоить значение 2 для TBP, которое меньше, чем batchSize. Таким образом, для batchSize = 2000 нам понадобилось TBP как 1024.

2] Часть NumPy -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
    func1 = mod.get_function("main1")
    outlen = len(A)-batchSize+1

    # Set block and grid sizes
    BSZ = (1024,1,1)
    GSZ = (outlen,1)

    dest = np.zeros(outlen).astype(np.float32)
    N = np.int32(batchSize)
    func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), \
                     drv.In(data2b), drv.In(data2a),\
                     drv.In(N), block=BSZ, grid=GSZ)
    idx = np.flatnonzero(dest >= minimumLimit)
    return idx, dest[idx]

Бенчмаркинг

Я тестировал GTX 960M. Обратите внимание, что PyCUDA ожидает, что массивы будут непрерывного порядка. Итак, нам нужно обрезать столбцы и делать копии. Я ожидаю/предполагаю, что данные могут быть прочитаны из файлов, так что данные распределяются по строкам, а не как столбцы. Таким образом, удерживая их от контрольной функции на данный момент.

Оригинальный подход -

def org_app(data1, batchSize, minimumLimit):
    resultArray = []
    for rowNr in  range(data1.shape[0]-batchSize+1):
        tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
        result = doTheMath(tmp_df, data2a, data2b)
        if (result >= minimumLimit):
            resultArray.append([rowNr , result]) 
    return resultArray

Сроки и проверка -

In [2]: #Declare variables
   ...: batchSize = 2000
   ...: sampleSize = 50000
   ...: resultArray = []
   ...: minimumLimit = 490 #use 400 on the real sample data
   ...: 
   ...: #Create Random Sample Data
   ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
   ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
   ...: 
   ...: # Make column copies
   ...: A = data1[:,0].copy()
   ...: B = data1[:,1].copy()
   ...: C = data1[:,2].copy()
   ...: D = data1[:,3].copy()
   ...: 
   ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
   ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
   ...: print(np.allclose(gpu_out1, cpu_out1))
   ...: print(np.allclose(gpu_out2, cpu_out2))
   ...: 
True
False

Итак, есть некоторые различия между подсчетами CPU и GPU. Пусть исследуют их -

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))

In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])

In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1.,  1.,  1.])

Существует четыре экземпляра несоответствующих счетчиков. Они отключены с max на 1. После исследования я наткнулся на некоторые сведения об этом. В принципе, поскольку мы используем математические функции для вычислений max и min, и те, которые, я думаю, вызывают, что последний двоичный бит в плавающем представлении pt отличается от аналога процессора. Это называется ошибкой ULP и подробно обсуждается here и here.

Наконец, оставив проблему в стороне, позвольте получить самый важный бит, производительность -

In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop

In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop

In [12]: 2180.0/82.5
Out[12]: 26.424242424242426

Попробуйте с большими наборами данных. С sampleSize = 500000 получаем: <

In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop

In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop

In [16]: 23200.0/821
Out[16]: 28.25822168087698

Итак, ускорение остается постоянным со скоростью 27.

Ограничения:

1) Мы используем цифры float32, поскольку графические процессоры лучше всего работают с ними. Двойная точность, особенно на несерверных графических процессорах, не очень популярна, когда дело доходит до производительности, и поскольку вы работаете с таким GPU, я тестировал с помощью float32.

Дальнейшее улучшение:

1) Мы могли бы использовать constant memory быстрее data2a и data2b вместо использования global memory.

Ответ 2

Tweak # 1

Обычно он советует векторизовать вещи при работе с массивами NumPy. Но с очень большими массивами, я думаю, что у вас нет вариантов. Таким образом, чтобы повысить производительность, можно сделать небольшую настройку на последнем этапе суммирования.

Мы могли бы заменить шаг, который делает массив 1s и 0s, и суммирует:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

с np.count_nonzero, который эффективно работает, чтобы подсчитать значения True в булевом массиве, вместо преобразования в 1s и 0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

Тест времени выполнения -

In [45]: abcd = np.random.randint(11,99,(10000))

In [46]: data2a = np.random.randint(11,99,(10000))

In [47]: data2b = np.random.randint(11,99,(10000))

In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop

In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop

Tweak # 2

Используйте предварительно вычисленный ответный ответ, когда имеете дело с случаями, которые подвергаются неявному вещанию. Дополнительная информация here. Таким образом, сохраняйте обратную сторону dif и используйте это вместо этого на шаге:

((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ...

Пример теста -

In [52]: A = np.random.rand(10000)

In [53]: dif = 0.5

In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop

In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop

У вас есть четыре места, использующих деление на dif. Так что, надеюсь, это тоже принесло бы заметный импульс!

Ответ 3

Прежде чем приступать к настройке целевого (GPU) или использования чего-либо еще (например, параллельных исполнений), вы можете подумать о том, как улучшить уже существующий код. Вы использовали numba -tag, так что я' ll использовать его для улучшения кода: сначала мы работаем на массивах не на матрицах:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

Каждый раз, когда вы вызываете doTheMath, вы ожидаете целое число назад, однако вы используете множество массивов и создаете много промежуточных массивов:

abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

Это создает промежуточный массив на каждом шаге:

  • tmp1 = A-Cmin,
  • tmp2 = tmp1 / dif,
  • tmp3 = B - Cmin,
  • tmp4 = tmp3 / dif
  • ... вы получаете суть.

Однако это функция сокращения (array → integer), поэтому наличие большого количества промежуточных массивов - лишний вес, просто вычислите значение "fly".

import numba as nb

@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
    Bmax = np.max(tmpData[:, 1])
    Cmin = np.min(tmpData[:, 2])
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    sum_ = 0
    for i in range(tmpData.shape[0]):
        val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

Я сделал что-то еще, чтобы избежать нескольких операций:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)

Это фактически сокращает время выполнения почти на 10% на моем компьютере:

%timeit doTheMath(tmp_df, data2a, data2b)       # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b)  # 10000 loops, best of 3: 59 µs per loop

Конечно, есть и другие улучшения, например, используя min/max для вычисления Bmax и Cmin, что сделало бы по меньшей мере часть прогона вычисления в O(sampleSize) вместо O(samplesize * batchsize). Это также позволило бы повторно использовать некоторые из вычислений (A + B + C + D) / (4 * dif) - (Cmin / dif), потому что если Cmin и Bmax не изменяются для следующего образца, эти значения не отличаются. Это немного сложно сделать, потому что сравнения различаются. Но определенно возможно! См. Здесь:

import time
import numpy as np
import numba as nb

@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
    diff = (Bmax - Cmin)
    idiff = 1 / diff
    quarter_idiff = 0.25 * idiff
    sum_ = 0
    for i in range(abcd.shape[0]):
        val = abcd[i] * quarter_idiff - Cmin * idiff
        if val <= data2a[i] and val >= data2b[i]:
            sum_ += 1
    return sum_

@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
    found = 0
    for rowNr in range(data1.shape[0]):
        if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
            result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], 
                                    data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
            if (result >= minimumLimit):
                resultArray[found, 0] = rowNr
                resultArray[found, 1] = result
                found += 1
    return resultArray[:found]

#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data 

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))  # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2], 
                                 size=batchSize, 
                                 origin=-((batchSize-1)//2-1))

result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print('Runtime:', time.time() - t0)

Это дает мне Runtime: 0.759593152999878 (после того, как numba скомпилировал функции!), а ваш оригинал взял Runtime: 24.68975639343262. Теперь мы в 30 раз быстрее!

С вашим размером выборки он по-прежнему занимает Runtime: 60.187848806381226, но это не так уж плохо, правильно?

И даже если я сам этого не сделал, говорит, что можно написать "Numba для графических процессоров CUDA" , и это не кажется сложным.

Ответ 4

Вот некоторый код, чтобы продемонстрировать, что можно, просто настроив алгоритм. Это чисто numpy, но на данных образца, которые вы опубликовали, дает примерно 35-кратное ускорение по сравнению с исходной версией (~ 1,000,000 выборок в ~ 2.5 сек на моей довольно скромной машине):

>>> result_dict = master('run')
[('load', 0.82578349113464355), ('precomp', 0.028138399124145508), ('max/min', 0.24333405494689941), ('ABCD', 0.015314102172851562), ('main', 1.3356468677520752)]
TOTAL 2.44821691513

Используемые твики:

  • A + B + C + D, см. мой другой ответ
  • работает min/max, в том числе избегая вычислить (A + B + C + D - 4Cmin)/(4dif) несколько раз с тем же Cmin/dif.

Это более или менее обычные. Это оставляет сравнение с data2a/b, которое является дорогостоящим O (NK), где N - количество выборок, а K - размер окна. Здесь можно использовать относительно хорошие данные. Используя min/max для запуска, вы можете создавать варианты data2a/b, которые могут быть использованы для одновременного тестирования диапазона сдвигов окон, если тест не удался, все эти смещения могут быть немедленно исключены, в противном случае диапазон делит пополам.

import numpy as np
import time

# global variables; they will hold the precomputed pre-screening filters
preA, preB = {}, {}
CHUNK_SIZES = None

def sliding_argmax(data, K=2000):
    """compute the argmax of data over a sliding window of width K

    returns:
        indices  -- indices into data
        switches -- window offsets at which the maximum changes
                    (strictly speaking: where the index of the maximum changes)
                    excludes 0 but includes maximum offset (len(data)-K+1)

    see last line of compute_pre_screening_filter for a recipe to convert
    this representation to the vector of maxima
    """
    N = len(data)
    last = np.argmax(data[:K])
    indices = [last]
    while indices[-1] <= N - 1:
        ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
        if len(ge) == 0:
            if last + K >= N:
                break
            last += 1 + np.argmax(data[last + 1 : last + K + 1])
            indices.append(last)
        else:
            last += 1 + ge[0]
            indices.append(last)
    indices = np.array(indices)
    switches = np.where(data[indices[1:]] > data[indices[:-1]],
                        indices[1:] + (1-K), indices[:-1] + 1)
    return indices, np.r_[switches, [len(data)-K+1]]


def compute_pre_screening_filter(bound, n_offs):
    """compute pre-screening filter for point-wise upper bound

    given a K-vector of upper bounds B and K+n_offs-1-vector data
    compute K+n_offs-1-vector filter such that for each index j
    if for any offset 0 <= o < n_offs and index 0 <= i < K such that
    o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j

    therefore the number of data points below filter is an upper bound for
    the maximum number of points below bound in any K-window in data
    """
    pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
    padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
    indices, switches = sliding_argmax(padded, n_offs)
    return padded[indices].repeat(np.diff(np.r_[[0], switches]))


def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
    """compute upper and lower pre-screening filters for data blocks of
    sizes K+n_offs-1 where
    n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk

    the result is stored in global variables preA and preB
    """
    global CHUNK_SIZES

    CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
    preA[1] = upper
    preB[1] = lower
    for n in CHUNK_SIZES:
        preA[n] = compute_pre_screening_filter(upper, n)
        preB[n] = -compute_pre_screening_filter(-lower, n)


def test_bounds(block, counts, threshold=400):
    """test whether the windows fitting in the data block 'block' fall
    within the bounds using pre-screening for efficient bulk rejection

    array 'counts' will be overwritten with the counts of compliant samples
    note that accurate counts will only be returned for above threshold
    windows, because the analysis of bulk rejected windows is short-circuited

    also note that bulk rejection only works for 'well behaved' data and
    for example not on random numbers
    """
    N = len(counts)
    K = len(preA[1])
    r = N % CHUNK_SIZES[0]
    # chop up N into as large as possible chunks with matching pre computed
    # filters
    # start with small and work upwards
    counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                   (block[l:l+K] >= preB[1]))
                  for l in range(r)]

    def bisect(block, counts):
        M = len(counts)
        cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
        if cnts < threshold:
            counts[:] = cnts
            return
        elif M == CHUNK_SIZES[0]:
            counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
                                          (block[l:l+K] >= preB[1]))
                         for l in range(M)]
        else:
            M //= 2
            bisect(block[:-M], counts[:M])
            bisect(block[M:], counts[M:])

    N = N // CHUNK_SIZES[0]
    for M in CHUNK_SIZES:
        if N % 2:
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M
        elif N == 0:
            return
        N //= 2
    else:
        for j in range(2*N):
            bisect(block[r:r+M+K-1], counts[r:r+M])
            r += M


def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
            threshold=400):
    samples, upper, lower = data
    N, K = samples.shape[0], upper.shape[0]
    times = [time.time()]
    if use_pre_screening:
        compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
    times.append(time.time())
    # compute switching points of max and min for running normalisation
    upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
    lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
    times.append(time.time())
    # sum columns
    ABCD = samples.sum(axis=-1)
    times.append(time.time())
    counts = np.empty((N-K+1,), dtype=int)
    # main loop
    # loop variables:
    offs = 0
    u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
    l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
    while True:
        # check which is switching next, min(C) or max(B)
        if u_swp > l_swp:
            # greedily take the largest block possible such that dif and Cmin
            # do not change
            block = (ABCD[offs:l_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:l_swp], threshold=threshold)
            else:
                counts[offs:l_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(l_swp - offs)]
            # book keeping
            l_ind += 1
            offs = l_swp
            l_swp = lower_swp[l_ind]
            l_scale = samples[lower_inds[l_ind], 2]
        else:
            block = (ABCD[offs:u_swp+K-1] - 4*l_scale) \
                    * (0.25 / (u_scale-l_scale))
            if use_pre_screening:
                test_bounds(block, counts[offs:u_swp], threshold=threshold)
            else:
                counts[offs:u_swp] = [
                    np.count_nonzero((block[l:l+K] <= upper) &
                                     (block[l:l+K] >= lower))
                    for l in range(u_swp - offs)]
            u_ind += 1
            if u_ind == len(upper_inds):
                assert u_swp == N-K+1
                break
            offs = u_swp
            u_swp = upper_swp[u_ind]
            u_scale = samples[upper_inds[u_ind], 1]
    times.append(time.time())
    return {'counts': counts, 'valid': np.where(counts >= 400)[0],
            'timings': np.diff(times)}


def master(mode='calibrate', data='fake', use_pre_screening=True, nrep=3,
           min_chnk=None, dyads=None):
    t = time.time()
    if data in ('fake', 'load'):
        data1 = np.loadtxt('data1.csv', delimiter=';', skiprows=1,
                           usecols=[1,2,3,4])
        data2a = np.loadtxt('data2a.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        data2b = np.loadtxt('data2b.csv', delimiter=';', skiprows=1,
                            usecols=[1])
        if data == 'fake':
            data1 = np.tile(data1, (10, 1))
        threshold = 400
    elif data == 'random':
        data1 = np.random.random((102000, 4))
        data2b = np.random.random(2000)
        data2a = np.random.random(2000)
        threshold = 490
        if use_pre_screening or mode == 'calibrate':
            print('WARNING: pre-screening not efficient on artificial data')
    else:
        raise ValueError("data mode {} not recognised".format(data))
    data = data1, data2a, data2b
    t_load = time.time() - t
    if mode == 'calibrate':
        min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
        dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
        timings = np.zeros((len(min_chnk), len(dyads)))
        print('max bisect  ' + ' '.join([
            '   n.a.' if dy == 0 else '{:7d}'.format(dy) for dy in dyads]),
              end='')
        for i, mc in enumerate(min_chnk):
            print('\nmin chunk {}'.format(mc), end=' ')
            for j, dy in enumerate(dyads):
                for k in range(nrep):
                    if dy == 0: # no pre-screening
                        timings[i, j] += analyse(
                            data, False, mc, dy, threshold)['timings'][3]
                    else:
                        timings[i, j] += analyse(
                            data, True, mc, dy, threshold)['timings'][3]
                timings[i, j] /= nrep
                print('{:7.3f}'.format(timings[i, j]), end=' ', flush=True)
        best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
                                            timings.shape)
        print('\nbest', min_chnk[best_mc], dyads[best_dy])
        return timings, min_chnk[best_mc], dyads[best_dy]
    if mode == 'run':
        min_chnk = 2 if min_chnk is None else min_chnk
        dyads = 5 if dyads is None else dyads
        res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
        times = np.r_[[t_load], res['timings']]
        print(list(zip(('load', 'precomp', 'max/min', 'ABCD', 'main'), times)))
        print('TOTAL', times.sum())
        return res

Ответ 5

Это технически не по теме (не GPU), но я уверен, что вас это заинтересует.

Существует одна очевидная и довольно большая экономия:

Precompute A + B + C + D (не в цикле, по всем данным: data1.sum(axis=-1)), потому что abcd = ((A+B+C+D) - 4Cmin) / (4dif). Это сэкономит немало операционных систем.

Удивленный никто не заметил этого раньше, -)

Edit:

Есть еще одна вещь, хотя я подозреваю, что только в вашем примере, а не в ваших реальных данных:

В то время как примерно половина data2a будет меньше, чем data2b. В этих местах ваши условия на abcd не могут быть истинными, поэтому вам даже не нужно вычислять abcd.

Edit:

Еще одна настройка, которую я использовал ниже, но забыл упомянуть: если вы вычисляете max (или min) над движущимся окном. Когда вы перемещаете одну точку вправо, скажите, насколько вероятен максимум для изменения? Есть только две вещи, которые могут изменить его: новая точка справа больше (происходит примерно один раз в период времени окна, и даже если это происходит, вы сразу же знаете новый максимум), или старый максимум падает с окна  слева (также случается примерно один раз в период времени окна). Только в этом последнем случае вам нужно искать все окно для нового max.

Edit:

Не смог удержаться от попытки попробовать в тензорном потоке. У меня нет GPU, поэтому вы сами должны проверить его на скорость. Поместите "gpu" для "cpu" на отмеченную строку.

В cpu он примерно в два раза быстрее, чем исходная реализация (т.е. без настроек Divakar). Обратите внимание, что я позволил изменить входы от матрицы к простому массиву. В настоящее время tensorflow - это немного движущаяся цель, поэтому убедитесь, что у вас есть правильная версия. Я использовал Python3.6 и tf 0.12.1 Если вы делаете pip3 install tensorflow-gpu, сегодня он должен должен работать.

import numpy as np
import time
import tensorflow as tf

# currently the max/min code is sequential
# thus
parallel_iterations = 1
# but you can put this in a separate loop, precompute and then try and run
# the remainder of doTheMathTF with a larger parallel_iterations

# tensorflow is quite capricious about its data types
ddf = tf.float64
ddi = tf.int32

def worker(data1, data2a, data2b):
    ###################################
    # CHANGE cpu to gpu in next line! #
    ###################################
    with tf.device('/cpu:0'):
        g = tf.Graph ()
        with g.as_default():
            ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf)
            B = tf.constant(data1[:, 1], dtype=ddf)
            C = tf.constant(data1[:, 2], dtype=ddf)
            window = tf.constant(len(data2a))
            N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi)
            data2a = tf.constant(data2a, dtype=ddf)
            data2b = tf.constant(data2b, dtype=ddf)
            def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out):
                # most of the time we can keep the old max/min
                Bmaxind = tf.cond(Bmaxind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmax(B[i:i+window], axis=0)),
                                  lambda: tf.cond(Bmax>B[i+window-1], 
                                                  lambda: Bmaxind, 
                                                  lambda: i+window-1))
                Cminind = tf.cond(Cminind<i,
                                  lambda: i + tf.to_int32(
                                      tf.argmin(C[i:i+window], axis=0)),
                                  lambda: tf.cond(Cmin<C[i+window-1],
                                                  lambda: Cminind,
                                                  lambda: i+window-1))
                Bmax = B[Bmaxind]
                Cmin = C[Cminind]
                abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin)))
                out = out.write(i, tf.to_int32(
                    tf.count_nonzero(tf.logical_and(abcd <= data2a,
                                                    abcd >= data2b))))
                return i + 1, Bmax, Bmaxind, Cmin, Cminind, out
            with tf.Session(graph=g) as sess:
                i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
                    lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF,
                    (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf),
                     tf.Variable(-1, dtype=ddi),
                     tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi),
                     tf.TensorArray(ddi, size=N)),
                    shape_invariants=None,
                    parallel_iterations=parallel_iterations,
                    back_prop=False)
                out = out.pack()
                sess.run(tf.initialize_all_variables())
                out, = sess.run((out,))
    return out

#Declare variables
batchSize = 2000
sampleSize = 50000#00
resultArray = []

#Create Sample Data
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4))
data2a = np.random.uniform(0, 1, (batchSize,))
data2b = np.random.uniform(0, 1, (batchSize,))

t0 = time.time()
out = worker(data1, data2a, data2b)
print('Runtime (tensorflow):', time.time() - t0)


good_indices, = np.where(out >= 490)
res_tf = np.c_[good_indices, out[good_indices]]

def doTheMath(tmpData1, data2a, data2b):
    A = tmpData1[:, 0]
    B  = tmpData1[:,1]
    C   = tmpData1[:,2]
    D = tmpData1[:,3]
    Bmax = B.max()
    Cmin  = C.min()
    dif = (Bmax - Cmin)
    abcd = ((((A  - Cmin) / dif) + ((B  - Cmin) / dif) + ((C   - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
    return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

#Loop through the data
t0 = time.time()
for rowNr in  range(sampleSize+1):
    tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
    result = doTheMath(tmp_df, data2a, data2b)
    if (result >= 490):
        resultArray.append([rowNr , result])
print('Runtime (original):', time.time() - t0)
print(np.alltrue(np.array(resultArray)==res_tf))