Свертка двух трехмерных массивов с дополнением с одной стороны слишком медленная
В моем текущем проекте мне нужно " свертить" два трехмерных массива несколько необычным способом:
Предположим, что мы имеем два трехмерных массива A и B с размерами dimA и dimB (одинаковые для каждой оси). Теперь мы хотим создать третий массив C с размерами dimA + dimB для каждой оси.
Записи C вычисляются как:
c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2}
Моя текущая версия проста:
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA+dimB
C = np.zeros((dimC,dimC,dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1+x2
y = y1+y2
z = z1+z2
C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2]
К сожалению, эта версия очень медленная и непригодна для использования.
Моя вторая версия была:
C = scipy.signal.fftconvolve(A,B,mode="full")
Но это вычисляет только элементы max(dimA,dimB)
У кого-нибудь есть идея?
Ответы
Ответ 1
Вы пытались использовать Numba? Это пакет, который позволяет вам обернуть код Python, который обычно медленный с JIT-компилятором. Я быстро ударил по вашей проблеме с помощью Numba и значительно ускорился. Используя магическую функцию magicit timeit
для IPython, функция custom_convolution
заняла ~ 18 с, а оптимизированная функция Numba заняла 10,4 мс. Это ускорение более чем на 1700.
Здесь реализована функция Numba.
import numpy as np
from numba import jit, double
s = 15
array_a = np.random.rand(s ** 3).reshape(s, s, s)
array_b = np.random.rand(s ** 3).reshape(s, s, s)
# Original code
def custom_convolution(A, B):
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA + dimB
C = np.zeros((dimC, dimC, dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1 + x2
y = y1 + y2
z = z1 + z2
C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
return C
# Numba'ing the function with the JIT compiler
fast_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
Если вы вычисляете остатки между результатами обеих функций, вы получите нули. Это означает, что реализация JIT работает без проблем.
slow_result = custom_convolution(array_a, array_b)
fast_result = fast_convolution(array_a, array_b)
print np.max(np.abs(slow_result - fast_result))
Выход, который я получаю для этого, 0.0
.
Вы можете установить Numba в текущую настройку Python или быстро выполнить ее с помощью пакета AnacondaCE из continuum.io.
И последнее, но не менее важное: функция Numba быстрее, чем функция scipy.signal.fftconvolve
, в несколько раз.
Примечание. Я использую Anaconda, а не AnacondaCE. Есть несколько отличий между двумя пакетами для производительности Numba, но я не думаю, что это будет сильно отличаться.
Ответ 2
Метод Numba выше - это аккуратный трюк, но это будет только преимуществом для относительно небольшого алгоритма N. Алгоритм O (N ^ 6) убьет вас каждый раз, независимо от того, насколько быстро он выполняется. В моих тестах метод fftconvolve
пересекается быстрее, чем N = 20, а по N = 32 - в 10 раз быстрее. Оставив определение custom_convolution
выше:
from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
def fft_convolution(A, B):
return fftconvolve(A, B, mode='full')
if __name__ == '__main__':
reps = 3
nt, ft = [], []
x = range(2, 34)
for N in x:
print N
A = np.random.rand(N, N, N)
B = np.random.rand(N, N, N)
C1 = numba_convolution(A, B)
C2 = fft_convolution(A, B)
assert np.allclose(C1[:-1, :-1, :-1], C2)
t = Timer(lambda: numba_convolution(A, B))
nt.append(t.timeit(number=reps))
t = Timer(lambda: fft_convolution(A, B))
ft.append(t.timeit(number=reps))
plt.plot(x, ft, x, nt)
plt.show()
Он также очень зависит от N, поскольку FFT будет намного быстрее для степеней 2. Время для версии FFT по существу постоянное для N = 17 до N = 32, и оно еще быстрее при N = 33, где он начинает быстро расходиться.
Вы можете попробовать обернуть реализацию FFT в Numba, но вы не можете сделать это напрямую со скудной версией.
(Извините, что создал новый ответ, но у меня нет точек для ответа напрямую.)
Ответ 3
Общее правило заключается в использовании правильного алгоритма для задания, которое, если ядро свертки не является коротким по сравнению с данными, является сверткой на основе БПФ (кратковременно означает меньше log2 (n), где n - длина данные).
В вашем случае, поскольку вы свертываете два набора данных одинакового размера, вы, вероятно, захотите рассмотреть свертку на основе FFT.
Ясно, что scipy.signal.fftconvolve
в этом отношении недостаточно. Используя более быстрый алгоритм БПФ, вы можете сделать гораздо лучше, скопировав свою собственную процедуру свертки (это не помогает тому, что fftconvolve заставляет размер преобразования равным двум, в противном случае это может быть обезглавлено обезьяной).
В следующем коде используется pyfftw, мои обертки вокруг FFTW и создает пользовательский класс свертки CustomFFTConvolution
:
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
Это используется как:
custom_fft_conv = CustomFFTConvolution(A, B)
C = custom_fft_conv(A, B) # This can contain different values to during construction
с необязательным аргументом threads
при построении класса. Цель создания класса - извлечь выгоду из способности FFTW заранее планировать преобразование.
Полный демо-код ниже просто расширяет @Kelsey answer за время и т.д.
Ускорение существенно по сравнению с решением numba и решением ванили fftconvolve. При n = 33 он примерно на 40-45 раз быстрее, чем оба.
from timeit import Timer
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import fftconvolve
from numba import jit, double
import pyfftw
# Original code
def custom_convolution(A, B):
dimA = A.shape[0]
dimB = B.shape[0]
dimC = dimA + dimB
C = np.zeros((dimC, dimC, dimC))
for x1 in range(dimA):
for x2 in range(dimB):
for y1 in range(dimA):
for y2 in range(dimB):
for z1 in range(dimA):
for z2 in range(dimB):
x = x1 + x2
y = y1 + y2
z = z1 + z2
C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2]
return C
# Numba'ing the function with the JIT compiler
numba_convolution = jit(double[:, :, :](double[:, :, :],
double[:, :, :]))(custom_convolution)
def fft_convolution(A, B):
return fftconvolve(A, B, mode='full')
class CustomFFTConvolution(object):
def __init__(self, A, B, threads=1):
shape = (np.array(A.shape) + np.array(B.shape))-1
if np.iscomplexobj(A) and np.iscomplexobj(B):
self.fft_A_obj = pyfftw.builders.fftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.fftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.ifftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
else:
self.fft_A_obj = pyfftw.builders.rfftn(
A, s=shape, threads=threads)
self.fft_B_obj = pyfftw.builders.rfftn(
B, s=shape, threads=threads)
self.ifft_obj = pyfftw.builders.irfftn(
self.fft_A_obj.get_output_array(), s=shape,
threads=threads)
def __call__(self, A, B):
fft_padded_A = self.fft_A_obj(A)
fft_padded_B = self.fft_B_obj(B)
return self.ifft_obj(fft_padded_A * fft_padded_B)
def run_test():
reps = 10
nt, ft, cft, cft2 = [], [], [], []
x = range(2, 34)
for N in x:
print N
A = np.random.rand(N, N, N)
B = np.random.rand(N, N, N)
custom_fft_conv = CustomFFTConvolution(A, B)
custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2)
C1 = numba_convolution(A, B)
C2 = fft_convolution(A, B)
C3 = custom_fft_conv(A, B)
C4 = custom_fft_conv_nthreads(A, B)
assert np.allclose(C1[:-1, :-1, :-1], C2)
assert np.allclose(C1[:-1, :-1, :-1], C3)
assert np.allclose(C1[:-1, :-1, :-1], C4)
t = Timer(lambda: numba_convolution(A, B))
nt.append(t.timeit(number=reps))
t = Timer(lambda: fft_convolution(A, B))
ft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv(A, B))
cft.append(t.timeit(number=reps))
t = Timer(lambda: custom_fft_conv_nthreads(A, B))
cft2.append(t.timeit(number=reps))
plt.plot(x, ft, label='scipy.signal.fftconvolve')
plt.plot(x, nt, label='custom numba convolve')
plt.plot(x, cft, label='custom pyfftw convolve')
plt.plot(x, cft2, label='custom pyfftw convolve with threading')
plt.legend()
plt.show()
if __name__ == '__main__':
run_test()
РЕДАКТИРОВАТЬ: Более свежий scipy делает лучшую работу не всегда для заполнения до 2 длины, поэтому ближе к выходу к случаю pyFFTW.