Вычисление корреляции расширяющегося/скользящего окна Pandas с p-значением

Предположим, у меня есть DataFrame, на котором я хочу вычислить скользящие или расширяющиеся корреляции Пирсона между двумя столбцами.

import numpy as np
import pandas as pd
import scipy.stats as st


df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})

Со встроенной функциональностью pandas это очень быстро рассчитать

expanding_corr = df['x'].expanding(50).corr(df['y'])
rolling_corr = df['x'].rolling(50).corr(df['y'])

Однако, если я хочу получить p-значения, связанные с этими корреляциями, лучшее, что я могу сделать, это определить пользовательскую функцию groupby и передать apply к объекту groupby

def custom_roll(df, w, **kwargs):

    v = df.values
    d0, d1 = v.shape
    s0, s1 = v.strides
    a = np.lib.stride_tricks.as_strided(v, (d0 - (w - 1), w, d1), (s0, s0, s1))
    rolled_df = pd.concat({
        row: pd.DataFrame(values, columns=df.columns)
        for row, values in zip(df.index[(w-1):], a)
    })
    return rolled_df.groupby(level=0, **kwargs)

c_df = custom_roll(df, 50).apply(lambda df: st.pearsonr(df['x'], df['y']))

Теперь c_df содержит соответствующие корреляции и, что важно, связанные с ними p-значения.

Однако этот метод чрезвычайно медленный по сравнению со встроенным методом pandas, что означает, что он не подходит, так как практически я вычисляю эти корреляции тысячи раз в процессе оптимизации. Кроме того, я не уверен, как расширить функцию custom_roll для расширения окон.

Кто-нибудь может указать мне в направлении использования numpy чтобы получить p-значения по сравнению с расширяющимися окнами на векторизованных скоростях?

Ответы

Ответ 1

Я не мог придумать умного способа сделать это в пандах, используя прямое rolling, но учтите, что вы можете вычислить значение p, учитывая коэффициент корреляции.

Коэффициент корреляции Пирсона следует t-распределению Стьюдента, и вы можете получить значение p, подключив его к cdf файлу, определенному неполной бета-функцией scipy.special.betainc. Это звучит сложно, но может быть сделано в несколько строк кода. Ниже приведена функция, которая вычисляет значение p с учетом коэффициента корреляции corr и размера выборки n. Это на самом деле основано на реализации scipy, которую вы использовали.

import pandas as pd
from scipy.special import betainc

def pvalue(corr, n=50):
    df = n - 2
    t_squared = corr**2 * (df / ((1.0 - corr) * (1.0 + corr)))
    prob = betainc(0.5*df, 0.5, df/(df+t_squared))
    return prob

Затем вы можете применить эту функцию к уже имеющимся значениям корреляции.

rolling_corr = df['x'].rolling(50).corr(df['y'])
pvalue(rolling_corr)

Это может быть не идеальное векторизованное решение для кучи, но оно должно быть в десятки раз быстрее, чем вычисление корреляций снова и снова.

Ответ 2

Подход № 1

corr2_coeff_rowwise указывается, как сделать поэлементную корреляцию между строками. Мы могли бы сократить его до случая поэлементной корреляции между двумя столбцами. Таким образом, мы получим цикл, который использует corr2_coeff_rowwise. Затем мы попытаемся векторизовать его и увидеть, что в нем есть фрагменты, которые можно векторизовать:

  1. Получение средних значений со mean. Это может быть векторизовано с использованием единого фильтра.
  2. Следующим шагом было получение различий между этими средними значениями относительно скользящих элементов из входных массивов. Чтобы портировать на векторизованный, мы бы использовали broadcasting.

Остальные остаются pearsonr же, чтобы получить первый от двух выходов из pearsonr.

Чтобы получить второй вывод, вернемся к source code. Это должно быть просто, учитывая первый коэффициент вывода.

Итак, имея в виду, мы бы в конечном итоге что-то вроде этого -

import scipy.special as special
from scipy.ndimage import uniform_filter

def sliding_corr1(a,b,W):
    # a,b are input arrays; W is window length

    am = uniform_filter(a.astype(float),W)
    bm = uniform_filter(b.astype(float),W)

    amc = am[W//2:-W//2+1]
    bmc = bm[W//2:-W//2+1]

    da = a[:,None]-amc
    db = b[:,None]-bmc

    # Get sliding mask of valid windows
    m,n = da.shape
    mask1 = np.arange(m)[:,None] >= np.arange(n)
    mask2 = np.arange(m)[:,None] < np.arange(n)+W
    mask = mask1 & mask2
    dam = (da*mask)
    dbm = (db*mask)

    ssAs = np.einsum('ij,ij->j',dam,dam)
    ssBs = np.einsum('ij,ij->j',dbm,dbm)
    D = np.einsum('ij,ij->j',dam,dbm)
    coeff = D/np.sqrt(ssAs*ssBs)

    n = W
    ab = n/2 - 1
    pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
    return coeff,pval

Таким образом, чтобы получить окончательный вывод от входов из серии панд -

out = sliding_corr1(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)

Подход № 2

Очень похоже на Approach #1, но мы будем использовать numba для повышения эффективности памяти, чтобы заменить шаг № 2 от предыдущего подхода.

from numba import njit
import math

@njit(parallel=True)
def sliding_corr2_coeff(a,b,amc,bmc):
    L = len(a)-W+1
    out00 = np.empty(L)
    for i in range(L):
        out_a = 0
        out_b = 0
        out_D = 0
        for j in range(W):
            d_a = a[i+j]-amc[i]
            d_b = b[i+j]-bmc[i]
            out_D += d_a*d_b
            out_a += d_a**2
            out_b += d_b**2
        out00[i] = out_D/math.sqrt(out_a*out_b)
    return out00

def sliding_corr2(a,b,W):
    am = uniform_filter(a.astype(float),W)
    bm = uniform_filter(b.astype(float),W)

    amc = am[W//2:-W//2+1]
    bmc = bm[W//2:-W//2+1]

    coeff = sliding_corr2_coeff(a,b,amc,bmc)

    ab = W/2 - 1
    pval = 2*special.btdtr(ab, ab, 0.5*(1 - abs(np.float64(coeff))))
    return coeff,pval

Подход № 3

Очень похоже на предыдущее, за исключением того, что мы numba всю работу с коэффициентами на numba -

@njit(parallel=True)
def sliding_corr3_coeff(a,b,W):
    L = len(a)-W+1
    out00 = np.empty(L)
    for i in range(L):
        a_mean = 0.0
        b_mean = 0.0
        for j in range(W):
            a_mean += a[i+j]
            b_mean += b[i+j]
        a_mean /= W
        b_mean /= W

        out_a = 0
        out_b = 0
        out_D = 0
        for j in range(W):
            d_a = a[i+j]-a_mean
            d_b = b[i+j]-b_mean
            out_D += d_a*d_b
            out_a += d_a*d_a
            out_b += d_b*d_b
        out00[i] = out_D/math.sqrt(out_a*out_b)
    return out00

def sliding_corr3(a,b,W):    
    coeff = sliding_corr3_coeff(a,b,W)
    ab = W/2 - 1
    pval = 2*special.btdtr(ab, ab, 0.5*(1 - np.abs(coeff)))
    return coeff,pval

Сроки -

In [181]: df = pd.DataFrame({'x': np.random.rand(10000), 'y': np.random.rand(10000)})

In [182]: %timeit sliding_corr2(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.05 ms per loop

In [183]: %timeit sliding_corr3(df['x'].to_numpy(copy=False),df['y'].to_numpy(copy=False),50)
100 loops, best of 3: 5.51 ms per loop

Замечания:

  • Похоже, что для этого набора данных для sliding_corr1 требуется много времени, и, скорее всего, из-за потребности в памяти, sliding_corr1 с его шагом № 2.

  • Узкое место после использования функций numba, а затем переносится в вычисление p-val с помощью special.btdtr.