Самый быстрый способ создания строго растущих списков в Python

Я хотел бы узнать, что является самым эффективным способом достижения следующего в Python:

Предположим, что у нас есть два списка a и b, которые имеют равную длину и содержат до 1e7 элементов. Однако для удобства иллюстрации мы можем рассмотреть следующее:

a = [2, 1, 2, 3, 4, 5, 4, 6, 5, 7, 8, 9, 8,10,11]
b = [1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]

Цель состоит в том, чтобы создать строго монотонный список a_new из a, тогда как используется только первая точка выборки выборочных точек с одинаковыми значениями. Те же индексы, которые должны быть удалены в a, также должны быть удалены в b, так что конечный результат будет:

a_new = [2, 3, 4, 5, 6, 7, 8, 9,10,11]
b_new = [1, 4, 5, 6, 8,10,11,12,14,15]

Конечно, это можно сделать с использованием дорогостоящих циклов for, которые, однако, не подходят из-за огромного объема данных.

Любые предложения очень ценятся.

Ответы

Ответ 1

Запуск версии функции @juanpa.arrivillaga с numba

import numba

def psi(A):
    a_cummax = np.maximum.accumulate(A)
    a_new, idx = np.unique(a_cummax, return_index=True)
    return idx

def foo(arr):
    aux=np.maximum.accumulate(arr)
    flag = np.concatenate(([True], aux[1:] != aux[:-1]))
    return np.nonzero(flag)[0]

@numba.jit
def f(A):
    m = A[0]
    a_new, idx = [m], [0]
    for i, a in enumerate(A[1:], 1):
        if a > m:
            m = a
            a_new.append(a)
            idx.append(i)
    return idx

время

%timeit f(a)
The slowest run took 5.37 times longer than the fastest. This could mean that an intermediate result is being cached.
1000000 loops, best of 3: 1.83 µs per loop

%timeit foo(a)
The slowest run took 9.41 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 6.35 µs per loop

%timeit psi(a)
The slowest run took 9.66 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 9.95 µs per loop

Ответ 2

Вы можете рассчитать кумулятивный максимум a, а затем сбросить дубликаты с помощью np.unique, с помощью которого вы также можете записать уникальный индекс, чтобы соответственно подмножество b:

a = np.array([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11])
b = np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])

a_cummax = np.maximum.accumulate(a)    
a_new, idx = np.unique(a_cummax, return_index=True)

a_new
# array([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11])

b[idx]
# array([ 1,  4,  5,  6,  8, 10, 11, 12, 14, 15])

Ответ 3

Вот решение на основе ванильного Python, которое делает один проход:

>>> a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
>>> b = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
>>> a_new, b_new = [], []
>>> last = float('-inf')
>>> for x, y in zip(a, b):
...     if x > last:
...         last = x
...         a_new.append(x)
...         b_new.append(y)
...
>>> a_new
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
>>> b_new
[1, 4, 5, 6, 8, 10, 11, 12, 14, 15]

Мне любопытно посмотреть, как оно сравнивается с решением numpy, которое будет иметь сходную временную сложность, но делает пару проходов на данные.

Вот некоторые тайминги. Во-первых, настройка:

>>> small = ([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11], [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
>>> medium = (np.random.randint(1, 10000, (10000,)), np.random.randint(1, 10000, (10000,)))
>>> large = (np.random.randint(1, 10000000, (10000000,)), np.random.randint(1, 10000000, (10000000,)))

И теперь оба подхода:

>>> def monotonic(a, b):
...     a_new, b_new = [], []
...     last = float('-inf')
...     for x,y in zip(a,b):
...         if x > last:
...             last = x
...             a_new.append(x)
...             b_new.append(y)
...     return a_new, b_new
...
>>> def np_monotonic(a, b):
...     a_new, idx = np.unique(np.maximum.accumulate(a), return_index=True)
...     return a_new, b[idx]
...

Обратите внимание, что подходы не являются строго эквивалентными, один остается в ванильной земле Python, а другой остается в массиве numpy. Мы сравним производительность, предполагая, что вы начинаете с соответствующей структуры данных (либо numpy.array, либо list):

Итак, сначала небольшой список, то же самое из примера OP, мы видим, что numpy на самом деле не быстрее, что неудивительно для небольших структур данных:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, small; a, b = small", number=10000)
0.039130652003223076
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, small, np; a, b = np.array(small[0]), np.array(small[1])", number=10000)
0.10779813499539159

Теперь "средний" список/массив из 10 000 элементов, мы начинаем видеть преимущества numpy:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, medium; a, b = medium[0].tolist(), medium[1].tolist()", number=10000)
4.642718859016895
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, medium; a, b = medium", number=10000)
1.3776302759943064

Интересно, что преимущество кажется суженным с "большими" массивами порядка 1е7 элементов:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, large; a, b = large[0].tolist(), large[1].tolist()", number=10)
4.400254560023313
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, large; a, b = large", number=10)
3.593393853981979

Обратите внимание, что в последней паре таймингов я делал их только по 10 раз, но если у кого-то есть лучшая машина или больше терпения, пожалуйста, не стесняйтесь увеличивать number

Ответ 4

unique с return_index использует argsort. С maximum.accumulate, который не нужен. Поэтому мы можем cannibalize unique и делать:

In [313]: a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
In [314]: arr = np.array(a)
In [315]: aux = np.maximum.accumulate(arr)
In [316]: flag = np.concatenate(([True], aux[1:] != aux[:-1])) # key unique step
In [317]: idx = np.nonzero(flag)
In [318]: idx
Out[318]: (array([ 0,  3,  4,  5,  7,  9, 10, 11, 13, 14], dtype=int32),)
In [319]: arr[idx]
Out[319]: array([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11])
In [320]: np.array(b)[idx]
Out[320]: array([ 1,  4,  5,  6,  8, 10, 11, 12, 14, 15])

In [323]: np.unique(aux, return_index=True)[1]
Out[323]: array([ 0,  3,  4,  5,  7,  9, 10, 11, 13, 14], dtype=int32)

def foo(arr):
    aux=np.maximum.accumulate(arr)
    flag = np.concatenate(([True], aux[1:] != aux[:-1]))
    return np.nonzero(flag)[0]

In [330]: timeit foo(arr)
....
100000 loops, best of 3: 12.5 µs per loop
In [331]: timeit np.unique(np.maximum.accumulate(arr), return_index=True)[1]
....
10000 loops, best of 3: 21.5 µs per loop

С формой (10000,) medium этот уникальный тип имеет существенное преимущество в скорости:

In [334]: timeit np.unique(np.maximum.accumulate(medium[0]), return_index=True)[1]
1000 loops, best of 3: 351 µs per loop
In [335]: timeit foo(medium[0])
The slowest run took 4.14 times longer ....
10000 loops, best of 3: 48.9 µs per loop

[1]: используйте np.source(np.unique), чтобы увидеть код, или? в IPython