Примените функцию к каждой строке ndarray
У меня есть эта функция для вычисления квадрата расстояния Махаланобиса вектора x, означающего:
def mahalanobis_sqdist(x, mean, Sigma):
'''
Calculates squared Mahalanobis Distance of vector x
to distibutions' mean
'''
Sigma_inv = np.linalg.inv(Sigma)
xdiff = x - mean
sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
return sqmdist
У меня есть массив numpy, который имеет форму (25, 4)
. Итак, я хочу применить эту функцию ко всем 25 строкам моего массива без цикла for. Итак, в основном, как я могу написать векторизованную форму этого цикла:
for r in d1:
mahalanobis_sqdist(r[0:4], mean1, Sig1)
где mean1
и Sig1
:
>>> mean1
array([ 5.028, 3.48 , 1.46 , 0.248])
>>> Sig1 = np.cov(d1[0:25, 0:4].T)
>>> Sig1
array([[ 0.16043333, 0.11808333, 0.02408333, 0.01943333],
[ 0.11808333, 0.13583333, 0.00625 , 0.02225 ],
[ 0.02408333, 0.00625 , 0.03916667, 0.00658333],
[ 0.01943333, 0.02225 , 0.00658333, 0.01093333]])
Я пробовал следующее, но это не сработало:
>>> vecdist = np.vectorize(mahalanobis_sqdist)
>>> vecdist(d1, mean1, Sig1)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python2.7/dist-packages/numpy/lib/function_base.py", line 1862, in __call__
theout = self.thefunc(*newargs)
File "<stdin>", line 6, in mahalanobis_sqdist
File "/usr/lib/python2.7/dist-packages/numpy/linalg/linalg.py", line 445, in inv
return wrap(solve(a, identity(a.shape[0], dtype=a.dtype)))
IndexError: tuple index out of range
Ответы
Ответ 1
Чтобы применить функцию к каждой строке массива, вы можете использовать:
np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
В этом случае, однако, есть лучший способ. Вам не нужно применять функцию к каждой строке. Вместо этого вы можете применять операции NumPy для всего массива d1
для вычисления того же результата. np.einsum может заменить for-loop
и два вызова np.dot
:
def mahalanobis_sqdist2(d, mean, Sigma):
Sigma_inv = np.linalg.inv(Sigma)
xdiff = d - mean
return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)
Вот несколько этапов:
import numpy as np
np.random.seed(1)
def mahalanobis_sqdist(x, mean, Sigma):
'''
Calculates squared Mahalanobis Distance of vector x
to distibutions mean
'''
Sigma_inv = np.linalg.inv(Sigma)
xdiff = x - mean
sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
return sqmdist
def mahalanobis_sqdist2(d, mean, Sigma):
Sigma_inv = np.linalg.inv(Sigma)
xdiff = d - mean
return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)
def using_loop(d1, mean, Sigma):
expected = []
for r in d1:
expected.append(mahalanobis_sqdist(r[0:4], mean1, Sig1))
return np.array(expected)
d1 = np.random.random((25,4))
mean1 = np.array([ 5.028, 3.48 , 1.46 , 0.248])
Sig1 = np.cov(d1[0:25, 0:4].T)
expected = using_loop(d1, mean1, Sig1)
result = np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
result2 = mahalanobis_sqdist2(d1, mean1, Sig1)
assert np.allclose(expected, result)
assert np.allclose(expected, result2)
In [92]: %timeit mahalanobis_sqdist2(d1, mean1, Sig1)
10000 loops, best of 3: 31.1 µs per loop
In [94]: %timeit using_loop(d1, mean1, Sig1)
1000 loops, best of 3: 569 µs per loop
In [91]: %timeit np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
1000 loops, best of 3: 806 µs per loop
Таким образом, mahalanobis_sqdist2
примерно на 18 раз быстрее, чем a for-loop
, и на 26 раз быстрее, чем при использовании np.apply_along_axis
.
Обратите внимание, что np.apply_along_axis
, np.vectorize
, np.frompyfunc
- это служебные функции Python. Под капотом они используют for-
или while-loop
s. Здесь нет реальной "векторизации". Они могут предоставлять синтаксическую помощь, но не ожидайте, что они заставят ваш код работать лучше, чем for-loop
, который вы пишете сами.
Ответ 2
Ответ @unutbu отлично работает для применения любой функции к строкам массива.
В этом конкретном случае есть некоторые математические симметрии, которые вы можете использовать, что значительно ускорит процесс, если вы работаете с большими массивами.
Вот модифицированная версия вашей функции:
def mahalanobis_sqdist3(x, mean, Sigma):
Sigma_inv = np.linalg.inv(Sigma)
xdiff = x - mean
return (xdiff.dot(Sigma_inv)*xdiff).sum(axis=-1)
Если вы в конечном итоге используете какой-либо большой Sigma
, я бы рекомендовал вам кэшировать Sigma_inv
и передать это как аргумент вашей функции.
Так как это 4x4 в этом примере, это не имеет значения.
Я покажу, как бороться с большим Sigma
в любом случае для всех, кто сталкивается с этим.
Если вы не собираетесь использовать один и тот же Sigma
несколько раз, вы не сможете его кэшировать, поэтому вместо инвертирования матрицы вы можете использовать другой метод для решения линейной системы.
Здесь я буду использовать разложение LU, встроенное в SciPy.
Это только улучшает время, если количество столбцов x
велико по сравнению с его количеством строк.
Вот функция, которая показывает этот подход:
from scipy.linalg import lu_factor, lu_solve
def mahalanobis_sqdist4(x, mean, Sigma):
xdiff = x - mean
Sigma_inv = lu_factor(Sigma)
return (xdiff.T*lu_solve(Sigma_inv, xdiff.T)).sum(axis=0)
Вот некоторые тайминги.
Я включу версию с einsum
, как указано в другом ответе.
import numpy as np
Sig1 = np.array([[ 0.16043333, 0.11808333, 0.02408333, 0.01943333],
[ 0.11808333, 0.13583333, 0.00625 , 0.02225 ],
[ 0.02408333, 0.00625 , 0.03916667, 0.00658333],
[ 0.01943333, 0.02225 , 0.00658333, 0.01093333]])
mean1 = np.array([ 5.028, 3.48 , 1.46 , 0.248])
x = np.random.rand(25, 4)
%timeit np.apply_along_axis(mahalanobis_sqdist, 1, x, mean1, Sig1)
%timeit mahalanobis_sqdist2(x, mean1, Sig1)
%timeit mahalanobis_sqdist3(x, mean1, Sig1)
%timeit mahalanobis_sqdist4(x, mean1, Sig1)
даяние:
1000 loops, best of 3: 973 µs per loop
10000 loops, best of 3: 36.2 µs per loop
10000 loops, best of 3: 40.8 µs per loop
10000 loops, best of 3: 83.2 µs per loop
Однако изменение размеров используемых массивов изменяет результаты синхронизации.
Например, допустив x = np.random.rand(2500, 4)
, тайминги:
10 loops, best of 3: 95 ms per loop
1000 loops, best of 3: 355 µs per loop
10000 loops, best of 3: 131 µs per loop
1000 loops, best of 3: 337 µs per loop
И пусть x = np.random.rand(1000, 1000)
, Sigma1 = np.random.rand(1000, 1000)
и mean1 = np.random.rand(1000)
, тайминги:
1 loops, best of 3: 1min 24s per loop
1 loops, best of 3: 2.39 s per loop
10 loops, best of 3: 155 ms per loop
10 loops, best of 3: 99.9 ms per loop
Изменить. Я заметил, что один из других ответов использовал разложение Холески.
Учитывая, что Sigma
является симметричным и положительно определенным, мы можем добиться лучших результатов, чем мои предыдущие результаты.
Есть несколько хороших подпрограмм BLAS и LAPACK, доступных через SciPy, которые могут работать с симметричными положительно определенными матрицами.
Вот две более быстрые версии.
from scipy.linalg.fblas import dsymm
def mahalanobis_sqdist5(x, mean, Sigma_inv):
xdiff = x - mean
Sigma_inv = la.inv(Sigma)
return np.einsum('...i,...i->...',dsymm(1., Sigma_inv, xdiff.T).T, xdiff)
from scipy.linalg.flapack import dposv
def mahalanobis_sqdist6(x, mean, Sigma):
xdiff = x - mean
return np.einsum('...i,...i->...', xdiff, dposv(Sigma, xdiff.T)[1].T)
Первый по-прежнему инвертирует Сигму.
Если вы предварительно вычислите обратное и повторно используете его, это намного быстрее (размер 1000x1000 занимает 35.6ms на моей машине с предварительно вычисленным обратным).
Я также использовал einsum, чтобы взять продукт, затем суммировать по последней оси.
Это оказалось немного быстрее, чем что-то вроде (A * B).sum(axis=-1)
.
Эти две функции дают следующие тайминги:
Первый тестовый пример:
10000 loops, best of 3: 55.3 µs per loop
100000 loops, best of 3: 14.2 µs per loop
Второй тестовый пример:
10000 loops, best of 3: 121 µs per loop
10000 loops, best of 3: 79 µs per loop
Третий тестовый пример:
10 loops, best of 3: 92.5 ms per loop
10 loops, best of 3: 48.2 ms per loop
Ответ 3
Только что увидел хороший комментарий к reddit, который может ускорить работу даже немного:
Это неудивительно для тех, кто регулярно использует numpy. Для петель в python ужасно медленны. Собственно, einsum тоже довольно медленный. Здесь версия, которая быстрее, если у вас много векторов (500 векторов в 4 измерениях достаточно, чтобы сделать эту версию быстрее, чем einsum на моей машине):
def no_einsum(d, mean, Sigma):
L_inv = np.linalg.inv(numpy.linalg.cholesky(Sigma))
xdiff = d - mean
return np.sum(np.dot(xdiff, L_inv.T)**2, axis=1)
Если ваши точки также являются размерными, то вычисление инверсного медленная (и вообще плохая идея), и вы можете сэкономить время на решение системы напрямую (500 векторов в 250 измерениях достаточно чтобы сделать эту версию самой быстрой на моей машине):
def no_einsum_solve(d, mean, Sigma):
L = numpy.linalg.cholesky(Sigma)
xdiff = d - mean
return np.sum(np.linalg.solve(L, xdiff.T)**2, axis=0)
Ответ 4
Проблема заключается в том, что np.vectorize
векторизовать по всем аргументам, но вам нужно векторизовать только по первому. Вам нужно использовать excluded
аргумент ключевого слова vectorize
:
np.vectorize(mahalanobis_sqdist, excluded=[1, 2])