Есть ли способ сделать numpy.argmin() так же быстро, как min()?

Я пытаюсь найти минимальные индексы массивов по одному измерению очень большого массива 2D numpy. Я нахожу, что это очень медленно (уже пробовал ускорить его с узким местом, что было лишь минимальным улучшением). Однако, взятие прямого минимума оказывается на порядок быстрее:

import numpy as np
import time

randvals = np.random.rand(3000,160000)
start = time.time()
minval = randvals.min(axis=0)
print "Took {0:.2f} seconds to compute min".format(time.time()-start)
start = time.time()
minindex = np.argmin(randvals,axis=0)
print "Took {0:.2f} seconds to compute argmin".format(time.time()-start)

На моей машине эти выходы:

Took 0.83 seconds to compute min
Took 9.58 seconds to compute argmin

Есть ли причина, по которой argmin работает намного медленнее? Есть ли способ ускорить его до сопоставимого с мин?

Ответы

Ответ 1

In [1]: import numpy as np

In [2]: a = np.random.rand(3000, 16000)

In [3]: %timeit a.min(axis=0)
1 loops, best of 3: 421 ms per loop

In [4]: %timeit a.argmin(axis=0)
1 loops, best of 3: 1.95 s per loop

In [5]: %timeit a.min(axis=1)
1 loops, best of 3: 302 ms per loop

In [6]: %timeit a.argmin(axis=1)
1 loops, best of 3: 303 ms per loop

In [7]: %timeit a.T.argmin(axis=1)
1 loops, best of 3: 1.78 s per loop

In [8]: %timeit np.asfortranarray(a).argmin(axis=0)
1 loops, best of 3: 1.97 s per loop

In [9]: b = np.asfortranarray(a)

In [10]: %timeit b.argmin(axis=0)
1 loops, best of 3: 329 ms per loop

Возможно, min достаточно умен, чтобы выполнять свою работу последовательно над массивом (следовательно, с локальностью кэш-памяти), а argmin перескакивает вокруг массива (вызывая много промахов в кеше)?

В любом случае, если вы хотите сохранить randvals в качестве массива, упорядоченного по Фортрану, с самого начала, это будет быстрее, хотя копирование в Fortran-order не помогает.

Ответ 2

Я просто взглянул на исходный код, и пока я не совсем понимаю, почему дела идут так, как они есть, вот что происходит:

  • np.min - это в основном вызов np.minimum.reduce.

  • np.argmin сначала перемещает ось, в которой вы хотите работать, до конца кортежа формы, затем делает ее смежным массивом, который, конечно, запускает копию полного массива, если ось не была последней для начала.

Поскольку выполняется копирование, вы можете получить объявление и попытаться создать более дешевые массивы:

a = np.random.rand(1000, 2000)

def fast_argmin_axis_0(a):
    matches = np.nonzero((a == np.min(a, axis=0)).ravel())[0]
    rows, cols = np.unravel_index(matches, a.shape)
    argmin_array = np.empty(a.shape[1], dtype=np.intp)
    argmin_array[cols] = rows
    return argmin_array

In [8]: np.argmin(a, axis=0)
Out[8]: array([230, 532, 815, ..., 670, 702, 989], dtype=int64)

In [9]: fast_argmin_axis_0(a)
Out[9]: array([230, 532, 815, ..., 670, 702, 989], dtype=int64)

In [10]: %timeit np.argmin(a, axis=0)
10 loops, best of 3: 27.3 ms per loop

In [11]: %timeit fast_argmin_axis_0(a)
100 loops, best of 3: 15 ms per loop

Я бы не стал допустить, чтобы текущая реализация была ошибкой, так как могут быть веские причины для numpy делать то, что она делает так, как она это делает, но что этот вид обмана может ускорить то, что должно быть очень оптимизированная функция, настоятельно предлагает сделать что-то лучше.

Ответ 3

Я просто попал в ту же проблему и обнаружил большую разницу в производительности, когда выбрана ось 0 для нахождения минимума. Как было предложено, проблема, похоже, связана с внутренним копированием массива.

Я разработал довольно простой подход с использованием Cython, который одновременно устанавливает минимальные значения и их положение в массиве 2D numpy вдоль данной оси. Обратите внимание, что для axis = 0 алгоритм работает с массивом столбцов (максимальное число, указанное константой blocksize - здесь установлено эквивалентно 8 кбайтам данных) одновременно, чтобы эффективно использовать кеш:

%%cython -c=-O2 -c=-march=native

import numpy as np
cimport numpy as np 
cimport cython
from libc.stdint cimport uint32_t

@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_64_axis_0(uint32_t[:] minloc, double[:] minimum, double[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 64-bit floats along axis 0
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float64, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(n)
       Stores the rows where the minima occur for each column.
    minimum: numpy_array, dtype=np.float64, shape=(n)
       The minima along each column.
    """

    # Columns of the matrix are accessed in blocks to increase cache performance.
    # Specify the number of columns here:

    DEF blocksize = 1024

    cdef int i, j, k
    cdef double minim[blocksize]
    cdef uint32_t minimloc[blocksize]

    # Read blocks of data to make good use of the cache

    cdef int blocks
    blocks  = arr.shape[1] / blocksize
    remains = arr.shape[1] % blocksize

    for i in xrange(0, blocksize * blocks, blocksize):

        for k in xrange(blocksize):
            minim[k]    = arr[0, i + k]
            minimloc[k] = 0

        for j in xrange(1, arr.shape[0]):

            for k in xrange(blocksize):

                if arr[j, i + k] < minim[k]:
                    minim[k] = arr[j, i + k]
                    minimloc[k] = j

        for k in xrange(blocksize):
            minimum[i + k] = minim[k]
            minloc[i + k]  = minimloc[k]

    # Work on the final 'incomplete' block

    i = blocksize * blocks

    for k in xrange(remains):
        minim[k]    = arr[0, i + k]
        minimloc[k] = 0

    for j in xrange(1, arr.shape[0]):

        for k in xrange(remains):

            if arr[j, i + k] < minim[k]:
                minim[k] = arr[j, i + k]
                minimloc[k] = j

    for k in xrange(remains):
        minimum[i + k] = minim[k]
        minloc[i + k]  = minimloc[k]

    # Done!


@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_64_axis_1(uint32_t[:] minloc, double[:] minimum, double[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 64-bit floats along axis 1
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float64, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(m)
       Stores the rows where the minima occur for each row.
    minimum: numpy_array, dtype=np.float64, shape=(m)
       The minima along each row.
    """
    cdef int i
    cdef int j
    cdef double minim
    cdef uint32_t minimloc

    for i in xrange(arr.shape[0]):
        minim = arr[i, 0]
        minimloc = 0

        for j in xrange(1, arr.shape[1]):
            if arr[i, j] < minim:
                minim = arr[i, j]
                minimloc = j

        minimum[i] = minim
        minloc[i]  = minimloc


@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_32_axis_0(uint32_t[:] minloc, float[:] minimum, float[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 32-bit floats along axis 0
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float32, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(n)
       Stores the rows where the minima occur for each column.
    minimum: numpy_array, dtype=np.float32, shape=(n)
       The minima along each column.
    """

    # Columns of the matrix are accessed in blocks to increase cache performance.
    # Specify the number of columns here:  

    DEF blocksize = 2048

    cdef int i, j, k
    cdef float minim[blocksize]
    cdef uint32_t minimloc[blocksize]

    # Read blocks of data to make good use of the cache

    cdef int blocks
    blocks  = arr.shape[1] / blocksize
    remains = arr.shape[1] % blocksize

    for i in xrange(0, blocksize * blocks, blocksize):

        for k in xrange(blocksize):
            minim[k]    = arr[0, i + k]
            minimloc[k] = 0

        for j in xrange(1, arr.shape[0]):

            for k in xrange(blocksize):

                if arr[j, i + k] < minim[k]:
                    minim[k] = arr[j, i + k]
                    minimloc[k] = j

        for k in xrange(blocksize):
            minimum[i + k] = minim[k]
            minloc[i + k]  = minimloc[k]

    # Work on the final 'incomplete' block

    i = blocksize * blocks

    for k in xrange(remains):
        minim[k]    = arr[0, i + k]
        minimloc[k] = 0

    for j in xrange(1, arr.shape[0]):

        for k in xrange(remains):

            if arr[j, i + k] < minim[k]:
                minim[k] = arr[j, i + k]
                minimloc[k] = j

    for k in xrange(remains):
        minimum[i + k] = minim[k]
        minloc[i + k]  = minimloc[k]

    # Done!

@cython.boundscheck(False)
@cython.wraparound(False)
cdef void _minargmin_2d_32_axis_1(uint32_t[:] minloc, float[:] minimum, float[:, :] arr) nogil:
    """
    Find the minimum and argmin for a 2D array of 32-bit floats along axis 1
    Parameters:
    -----------
    arr: numpy_array, dtype=np.float32, shape=(m, n)
       The array for which the minima should be computed.
    minloc: numpy_array, dtype=np.uint32, shape=(m)
       Stores the rows where the minima occur for each row.
    minimum: numpy_array, dtype=np.float32, shape=(m)
       The minima along each row.
    """
    cdef int i
    cdef int j
    cdef float minim
    cdef uint32_t minimloc

    for i in xrange(arr.shape[0]):
        minim = arr[i, 0]
        minimloc = 0

        for j in xrange(1, arr.shape[1]):
            if arr[i, j] < minim:
                minim = arr[i, j]
                minimloc = j

        minimum[i] = minim
        minloc[i]  = minimloc

def Min_Argmin(array_2d, axis = 1):
    """
    Find the minima and corresponding locations (argmin) of a two-dimensional
    numpy array of floats along a given axis simultaneously, and returns them
    as a tuple of arrays (min_2d, argmin_2d).

    (Note: float16 arrays will be internally transformed to float32 arrays.)
    ----------
    array_2d: numpy_array, dtype=np.float32 or np.float64, shape=(m, n)
       The array for which the minima should be computed.
    axis : int, 0 or 1 (default) : 
        The axis along which minima are computed.
    min_2d: numpy_array, dtype=np.uint8, shape=(m) or shape=(n):
       The array where the minima along the given axis are stored.
    argmin_2d:
       The array storing the row/column where the minimum occurs.
    """

    # Sanity checks:
    if len(array_2d.shape) != 2:
        raise IndexError('Min_Argmin: Number of dimensions of array must be 2')

    if not (axis == 0 or axis == 1):
        raise ValueError('Min_Argmin: Invalid axis specified')

    arr_type = array_2d.dtype

    if not arr_type in ('float16', 'float32', 'float64'):
        raise ValueError('Min_Argmin: Not a float array')

    # Transform float16 arrays
    if arr_type == 'float16':
        array_2d = array_2d.astype(np.float32)

    # Run analysis

    if arr_type == 'float64':

        # Double accuracy

        min_array    = np.zeros(array_2d.shape[1 - axis], dtype = np.float64)
        argmin_array = np.zeros(array_2d.shape[1 - axis], dtype = np.uint32)

        if (axis == 0):
            _minargmin_2d_64_axis_0(argmin_array, min_array, array_2d)

        else:
            _minargmin_2d_64_axis_1(argmin_array, min_array, array_2d)

    else:

        # Single accuracy

        min_array    = np.zeros(array_2d.shape[1 - axis], dtype = np.float32)
        argmin_array = np.zeros(array_2d.shape[1 - axis], dtype = np.uint32)

        if (axis == 0):
            _minargmin_2d_32_axis_0(argmin_array, min_array, array_2d)

        else:
            _minargmin_2d_32_axis_1(argmin_array, min_array, array_2d)

        # Transform back to float16 type as necessary

        if arr_type == 'float16':
            min_array = min_array.astype(np.float16)


    # Return results
    return min_array, argmin_array

Код можно поместить и скомпилировать в ячейку ноутбука IPython после загрузки поддержки Cython:

%load_ext Cython

а затем вызывается в виде:

min_array, argmin_array = Min_Argmin(two_dim_array, axis = 0 or 1)

Пример синхронизации:

random_array = np.random.rand(20000, 20000).astype(np.float32)

%timeit min_array, argmin_array = Min_Argmin(random_array, axis = 0)
%timeit min_array, argmin_array = Min_Argmin(random_array, axis = 1)

1 loops, best of 3: 405 ms per loop
1 loops, best of 3: 307 ms per loop

Для сравнения:

%%timeit 
min_array    = random_array.min(axis = 0)
argmin_array = random_array.argmin(axis = 0)

1 loops, best of 3: 10.3 s per loop

%%timeit 
min_array    = random_array.min(axis = 1)
argmin_array = random_array.argmin(axis = 1)

1 loops, best of 3: 423 ms per loop

Итак, существует значительное ускорение для axis = 0 (и все еще небольшое преимущество для axis = 1, если вас интересуют как минимум, так и местоположение).