Можно ли ускорить этот цикл в Python?
Обычный способ сопоставления функции в numpy.narray
как np.array[map(some_func,x)]
или vectorize(f)(x)
не может предоставить индекс.
Следующий код - это простой пример, который обычно встречается во многих приложениях.
dis_mat = np.zeros([feature_mat.shape[0], feature_mat.shape[0]])
for i in range(feature_mat.shape[0]):
for j in range(i, feature_mat.shape[0]):
dis_mat[i, j] = np.linalg.norm(
feature_mat[i, :] - feature_mat[j, :]
)
dis_mat[j, i] = dis_mat[i, j]
Есть ли способ ускорить его?
Спасибо за помощь! Самый быстрый способ ускорить этот код - это использование функции, которая @user2357112 прокомментировала:
from scipy.spatial.distance import pdist,squareform
dis_mat = squareform(pdist(feature_mat))
@Julien также хорош, если feature_mat
невелик, но когда feature_mat
составляет 1000 к 2000 году, тогда ему требуется около 40 ГБ памяти.
Ответы
Ответ 1
Вы можете создать новую ось и трансляцию:
dis_mat = np.linalg.norm(feature_mat[:,None] - feature_mat, axis=-1)
Timing:
feature_mat = np.random.rand(100,200)
def a():
dis_mat = np.zeros([feature_mat.shape[0], feature_mat.shape[0]])
for i in range(feature_mat.shape[0]):
for j in range(i, feature_mat.shape[0]):
dis_mat[i, j] = np.linalg.norm(
feature_mat[i, :] - feature_mat[j, :]
)
dis_mat[j, i] = dis_mat[i, j]
def b():
dis_mat = np.linalg.norm(feature_mat[:,None] - feature_mat, axis=-1)
%timeit a()
100 loops, best of 3: 20.5 ms per loop
%timeit b()
100 loops, best of 3: 11.8 ms per loop
Ответ 2
SciPy поставляется с функцией специально для вычисления типа попарных расстояний, которые вы вычисляете. Он scipy.spatial.distance.pdist
, и он создает расстояния в сжатом формате, который в основном сохраняет только верхний треугольник матрицы расстояния, но вы можете преобразовать результат в квадратную форму с scipy.spatial.distance.squareform
:
from scipy.spatial.distance import pdist, squareform
distance_matrix = squareform(pdist(feature_mat))
Это позволяет избежать гигантских промежуточных массивов, требуемых с помощью прямого векторизованного решения, поэтому оно быстрее и работает на больших входах. Он теряет время подход, который использует алгебраические манипуляции, чтобы dot
обрабатывать тяжелую работу.
pdist
также поддерживает широкий диапазон альтернативных метрик расстояния, если вы решите, что хотите чего-то другого, кроме евклидова расстояния.
# Manhattan distance!
distance_matrix = squareform(pdist(feature_mat, 'cityblock'))
# Cosine distance!
distance_matrix = squareform(pdist(feature_mat, 'cosine'))
# Correlation distance!
distance_matrix = squareform(pdist(feature_mat, 'correlation'))
# And more! Check out the docs.
Ответ 3
Фактор, что можно сделать, и использовать оптимизацию np.dot
на матрице k x k
в небольшом месте памяти (kxk):
def c(m):
xy=np.dot(m,m.T) # O(k^3)
x2=y2=(m*m).sum(1) #O(k^2)
d2=np.add.outer(x2,y2)-2*xy #O(k^2)
d2.flat[::len(m)+1]=0 # Rounding issues
return np.sqrt(d2) # O (k^2)
И для сравнения:
def d(m):
return squareform(pdist(m))
Вот "время (it)" для начальных матриц k * k:
![Введите описание изображения здесь]()
Два алгоритма: O (k ^ 3), но c(m)
делает O (k ^ 3) часть задания через np.dot
, критическую node линейной алгебры, которая приносит пользу всем оптимизации (многоядерные и т.д.). pdist
- это просто циклы, как показано в источнике .
Это объясняет коэффициент 15x для больших массивов, даже если pdist
использует симметрию матрицы, вычисляя только половину членов.
Ответ 4
Один из способов, по которым я думал, чтобы избежать смешивания циклов NumPy и for
, было бы создание массива индексов с использованием версии этого создателя индекса, которая позволяет заменить:
import numpy as np
from itertools import product, chain
from scipy.special import comb
def comb_index(n, k):
count = comb(n, k, exact=True, repetition=True)
index = np.fromiter(chain.from_iterable(product(range(n), repeat=k)),
int, count=count*k)
return index.reshape(-1, k)
Затем мы просто берем каждую из этих пар массивов, вычисляем разницу между ними, изменяем результирующий массив и берем норму каждой из строк массива:
reshape_mat = np.diff(feature_mat[comb_index(feature_mat.shape[0], 2), :], axis=1).reshape(-1, feature_mat.shape[1])
dis_list = np.linalg.norm(reshape_mat, axis=-1)
Обратите внимание, что dis_list
- это просто список всех n*(n+1)/2
возможных norms
. Это работает с той же скоростью, что и другой ответ для feature_mat
, который он предоставил, и при сравнении размеров байтов наших самых больших разделов
(feature_mat[:,None] - feature_mat).nbytes == 16000000
а
np.diff(feature_mat[comb_index(feature_mat.shape[0], 2), :], axis=1).reshape(-1, feature_mat.shape[1]).nbytes == 8080000
Для большинства входов мой использует только половину хранилища: все еще неоптимальный, но незначительное улучшение.
Ответ 5
На основе np.triu_indices
, если вы действительно хотите сделать это с помощью чистого NumPy:
s = feature_mat.shape[0]
i, j = np.triu_indices(s, 1) # All possible combinations of indices
dist_mat = np.empty((s, s)) # Don't waste time filling with zeros
np.einsum('ii->i', dist_mat)[:] = 0 # When you can just fill the diagonal
dist_mat[i, j] = dist_mat[j, i] = np.linalg.norm(feature_mat[i] - feature_mat[j], axis=-1)
# Vectorized version of your original process
Преимущество этого метода в вещании заключается в том, что вы можете сделать это в кусках:
n = 10000000 # Based on your RAM available
for k in range (0, i.size, n):
i_ = i[k: k + n]
j_ = j[k: k + n]
dist_mat[i_, j_] = dist_mat[j_, i_] = \
np.linalg.norm(feature_mat[i_] - feature_mat[j_], axis = -1)
Ответ 6
Начнем с переписывания этого в терминах функции:
dist(mat, i, j):
return np.linalg.norm(mat[i, :] - mat[j, :])
size = feature_mat.shape[0]
for i in range(size):
for j in range(size):
dis_mat[i, j] = dist(feature_mat, i, j)
Это можно переписать в (несколько более) векторной форме как:
v = [dist(feature_map, i, j) for i in range(size) for j in range(size)]
dist_mat = np.array(v).reshape(size, size)
Обратите внимание, что мы по-прежнему полагаемся на Python, а не на NumPy для некоторых вычислений, но это шаг к векторизации. Также обратите внимание, что dist(i, j)
является симметричным, поэтому мы могли бы дополнительно сократить вычисления примерно наполовину. Возможно, учитывая:
v = [dist(feature_map, i, j) for i in range(size) for j in range(i + 1)]
Теперь сложный бит назначает эти вычисленные значения правильным элементам в dist_mat
.
Как быстро это выполняется, зависит от стоимости вычислений dist(i, j)
. Для небольших feature_mat
s стоимость пересчета недостаточно высока, чтобы беспокоиться об этом. Но для больших матриц вы определенно не хотите компрометировать.