Быстрая интерполяция по трехмерному массиву
У меня есть трехмерный массив, который мне нужно интерполировать по одной оси (последнее измерение). Скажем y.shape = (nx, ny, nz)
, я хочу интерполировать в nz
для каждого (nx, ny)
. Тем не менее, я хочу, чтобы интерполяция для другого значения в каждом [i, j]
.
Вот пример кода. Если бы я хотел интерполировать на одно значение, скажем new_z
, я бы использовал scipy.interpolate.interp1d
, как этот
# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a number
f = scipy.interpolate.interp1d(x, y, axis=-1, kind='linear')
result = f(new_z)
Однако для этой проблемы я действительно хочу, чтобы интерполяция к другому new_z
для каждого y[i, j]
. Поэтому я делаю это:
# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a 2D array
result = numpy.empty(y.shape[:-1])
for i in range(nx):
for j in range(ny):
f = scipy.interpolate.interp1d(x, y[i, j], axis=-1, kind='linear')
result[i, j] = f(new_z[i, j])
К сожалению, с несколькими циклами это становится неэффективным и медленным. Есть ли лучший способ сделать такую интерполяцию? Достаточна линейная интерполяция. Возможность реализовать это в Cython, но я пытался избежать этого, потому что я хочу иметь гибкость перехода на кубическую интерполяцию и не хочу делать это вручную в Cython.
Ответы
Ответ 1
Чтобы ускорить интерполяцию высокого порядка, вы можете вызывать interp1d()
только один раз, а затем использовать атрибут _spline
и функцию низкого уровня _bspleval()
в модуле _fitpack
. Вот код:
from scipy.interpolate import interp1d
import numpy as np
nx, ny, nz = 30, 40, 50
x = np.arange(0, nz, 1.0)
y = np.random.randn(nx, ny, nz)
new_x = np.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0
def original_interpolation(x, y, new_x):
result = np.empty(y.shape[:-1])
for i in xrange(nx):
for j in xrange(ny):
f = interp1d(x, y[i, j], axis=-1, kind=3)
result[i, j] = f(new_x[i, j])
return result
def fast_interpolation(x, y, new_x):
from scipy.interpolate._fitpack import _bspleval
f = interp1d(x, y, axis=-1, kind=3)
xj,cvals,k = f._spline
result = np.empty_like(new_x)
for (i, j), value in np.ndenumerate(new_x):
result[i, j] = _bspleval(value, x, cvals[:, i, j], k, 0)
return result
r1 = original_interpolation(x, y, new_x)
r2 = fast_interpolation(x, y, new_x)
>>> np.allclose(r1, r2)
True
%timeit original_interpolation(x, y, new_x)
%timeit fast_interpolation(x, y, new_x)
1 loops, best of 3: 3.78 s per loop
100 loops, best of 3: 15.4 ms per loop
Ответ 2
Я не думаю, что interp1d
имеет способ сделать это быстро, поэтому вы не можете избежать этого цикла.
Cython вы, вероятно, все же можете избежать, кодируя линейную интерполяцию с помощью np.searchsorted
, что-то вроде этого (не тестировалось):
def interp3d(x, y, new_x):
assert x.ndim == 1 and y.ndim == 3 and new_x.ndim == 2
assert y.shape[:2] == new_x.shape and x.shape == y.shape[2:]
nx, ny = y.shape[:2]
new_x = new_x.ravel()
j = np.arange(len(new_x))
k = np.searchsorted(x, new_x).clip(1, len(x) - 1)
y = y.reshape(-1, x.shape[0])
p = (new_x - x[k-1]) / (x[k] - x[k-1])
result = (1 - p) * y[j,k-1] + p * y[j,k]
return result.reshape(nx, ny)
Не помогает с кубической интерполяцией.
EDIT: сделал его функцией и исправил ошибки. Некоторые тайминги против Cython (сетка 500x500x500):
In [58]: %timeit interp3d(x, y, new_x)
10 loops, best of 3: 82.7 ms per loop
In [59]: %timeit cyfile.interp3d(x, y, new_x)
10 loops, best of 3: 86.3 ms per loop
In [60]: abs(interp3d(x, y, new_x) - cyfile.interp3d(x, y, new_x)).max()
Out[60]: 2.2204460492503131e-16
Хотя, можно утверждать, что код Cython легче читать.
Ответ 3
Поскольку предложение numpy выше было слишком длинным, я мог бы дождаться такой версии cython для будущей ссылки. Из некоторых слабых эталонов он примерно в 3000 раз быстрее (предоставляется, это только линейная интерполяция и не доходит до interp1d
, но это нормально для этой цели).
import numpy as N
cimport numpy as N
cimport cython
DTYPEf = N.float64
ctypedef N.float64_t DTYPEf_t
@cython.boundscheck(False) # turn of bounds-checking for entire function
@cython.wraparound(False) # turn of bounds-checking for entire function
cpdef interp3d(N.ndarray[DTYPEf_t, ndim=1] x, N.ndarray[DTYPEf_t, ndim=3] y,
N.ndarray[DTYPEf_t, ndim=2] new_x):
"""
interp3d(x, y, new_x)
Performs linear interpolation over the last dimension of a 3D array,
according to new values from a 2D array new_x. Thus, interpolate
y[i, j, :] for new_x[i, j].
Parameters
----------
x : 1-D ndarray (double type)
Array containg the x (abcissa) values. Must be monotonically
increasing.
y : 3-D ndarray (double type)
Array containing the y values to interpolate.
x_new: 2-D ndarray (double type)
Array with new abcissas to interpolate.
Returns
-------
new_y : 3-D ndarray
Interpolated values.
"""
cdef int nx = y.shape[0]
cdef int ny = y.shape[1]
cdef int nz = y.shape[2]
cdef int i, j, k
cdef N.ndarray[DTYPEf_t, ndim=2] new_y = N.zeros((nx, ny), dtype=DTYPEf)
for i in range(nx):
for j in range(ny):
for k in range(1, nz):
if x[k] > new_x[i, j]:
new_y[i, j] = (y[i, j, k] - y[i, j, k - 1]) * \
(new_x[i, j] - x[k-1]) / (x[k] - x[k - 1]) + y[i, j, k - 1]
break
return new_y
Ответ 4
Основываться на @pv. ответ и векторизация внутреннего цикла, следующее дает существенное ускорение (EDIT: изменил дорогой numpy.tile
на использование numpy.lib.stride_tricks.as_strided
):
import numpy
from scipy import interpolate
nx = 30
ny = 40
nz = 50
y = numpy.random.randn(nx, ny, nz)
x = numpy.float64(numpy.arange(0, nz))
# We select some locations in the range [0.1, nz-0.1]
new_z = numpy.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0
# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a 2D array
def original_interpolation():
result = numpy.empty(y.shape[:-1])
for i in range(nx):
for j in range(ny):
f = interpolate.interp1d(x, y[i, j], axis=-1, kind='linear')
result[i, j] = f(new_z[i, j])
return result
grid_x, grid_y = numpy.mgrid[0:nx, 0:ny]
def faster_interpolation():
flat_new_z = new_z.ravel()
k = numpy.searchsorted(x, flat_new_z)
k = k.reshape(nx, ny)
lower_index = [grid_x, grid_y, k-1]
upper_index = [grid_x, grid_y, k]
tiled_x = numpy.lib.stride_tricks.as_strided(x, shape=(nx, ny, nz),
strides=(0, 0, x.itemsize))
z_upper = tiled_x[upper_index]
z_lower = tiled_x[lower_index]
z_step = z_upper - z_lower
z_delta = new_z - z_lower
y_lower = y[lower_index]
result = y_lower + z_delta * (y[upper_index] - y_lower)/z_step
return result
# both should be the same (giving a small difference)
print numpy.max(
numpy.abs(original_interpolation() - faster_interpolation()))
Это дает следующие моменты на моей машине:
In [8]: timeit foo.original_interpolation()
10 loops, best of 3: 102 ms per loop
In [9]: timeit foo.faster_interpolation()
1000 loops, best of 3: 564 us per loop
Переход на nx = 300
, ny = 300
и nz = 500
дает 130-кратное ускорение:
In [2]: timeit original_interpolation()
1 loops, best of 3: 8.27 s per loop
In [3]: timeit faster_interpolation()
10 loops, best of 3: 60.1 ms per loop
Вам понадобится написать собственный алгоритм для кубической интерполяции, но это не должно быть так сложно.
Ответ 5
Хотя есть несколько приятных ответов,
они все еще делают интерполяции 250k в фиксированном 500-длинном массиве:
j250k = np.searchsorted( X500, X250k ) # indices in [0, 500)
Это можно ускорить с помощью LUT, LookUp Table, с слотами 5k:
lut = np.interp( np.arange(5000), X500, np.arange(500) ).round().astype(int)
xscale = (X - X.min()) * (5000 - 1) \
/ (X.max() - X.min())
j = lut.take( xscale.astype(int), mode="clip" ) # take(floats) in numpy 1.7 ?
#---------------------------------------------------------------------------
# X | | | | |
# j 0 1 2 3 4 ...
# LUT |....|.......|.|.............|.... -> int j (+ offset in [0, 1) )
#---------------------------------------------------------------------------
searchsorted
довольно быстро, время ~ ln2 500,
так что это, вероятно, не намного быстрее.
Но LUT очень быстрые в C, простое соотношение скорости и памяти.
Ответ 6
Вы можете использовать map_coordinates для этого:
from numpy import random, meshgrid, arange
from scipy.ndimage import map_coordinates
(nx, ny, nz) = (4, 5, 6)
# some random array
A = random.rand(nx, ny, nz)
# random floating-point indices in [0, nz-1]
Z = random.rand(nx, ny)*(nz-1)
# regular integer indices of shape (nx,ny)
X, Y = meshgrid(arange(nx), arange(ny), indexing='ij')
coords = (X, Y, Z) # X, Y, and Z are of shape (nx, ny)
print map_coordinates(A, coords, order=1, cval=-999.)