Ответ 1
Ну, это кажется невероятным, но на самом деле нет способа извлечь такую информацию из разложения PCA (по крайней мере, от Spark 1.5). Но опять же, было много подобных "жалоб" - см. здесь, например, за невозможность извлечь лучшие параметры из CrossValidatorModel
.
К счастью, несколько месяцев назад я посетил "Масштабируемое машинное обучение" MOOC by AMPLab (Berkeley) и Databricks, т.е. создатели Spark, где мы выполнили полный протокол PCA "вручную" в рамках домашних заданий. Я изменил свои функции с тех пор (будьте уверены, я получил полный кредит:-), чтобы работать с dataframes как входы (вместо RDD), того же формата, что и ваш (т.е. строки DenseVectors
, содержащие числовые особенности).
Сначала нам нужно определить промежуточную функцию estimatedCovariance
следующим образом:
import numpy as np
def estimateCovariance(df):
"""Compute the covariance matrix for a given dataframe.
Note:
The multi-dimensional covariance array should be calculated using outer products. Don't
forget to normalize the data by first subtracting the mean.
Args:
df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.
Returns:
np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
length of the arrays in the input dataframe.
"""
m = df.select(df['features']).map(lambda x: x[0]).mean()
dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean
return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
Затем мы можем написать основную функцию pca
следующим образом:
from numpy.linalg import eigh
def pca(df, k=2):
"""Computes the top `k` principal components, corresponding scores, and all eigenvalues.
Note:
All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
each eigenvectors as a column. This function should also return eigenvectors as columns.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k (int): The number of principal components to return.
Returns:
tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of
rows equals the length of the arrays in the input `RDD` and the number of columns equals
`k`. The `RDD` of scores has the same number of rows as `data` and consists of arrays
of length `k`. Eigenvalues is an array of length d (the number of features).
"""
cov = estimateCovariance(df)
col = cov.shape[1]
eigVals, eigVecs = eigh(cov)
inds = np.argsort(eigVals)
eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
components = eigVecs[0:k]
eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvals
score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
# Return the `k` principal components, `k` scores, and all eigenvalues
return components.T, score, eigVals
Test
Посмотрите сначала результаты с существующим методом, используя данные примера из документации Spark ML PCA (модифицируя их так, чтобы быть всеми DenseVectors
):
from pyspark.ml.feature import *
from pyspark.mllib.linalg import Vectors
data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = sqlContext.createDataFrame(data,["features"])
pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
model = pca_extracted.fit(df)
model.transform(df).collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
Затем с помощью нашего метода:
comp, score, eigVals = pca(df)
score.collect()
[array([ 1.64857282, 4.0132827 ]),
array([-4.64510433, 1.11679727]),
array([-6.42888054, 5.33795143])]
Подчеркнем, что мы не используем любые collect()
методы в определенных нами функциях - score
- это RDD
, как и должно быть.
Обратите внимание, что знаки нашего второго столбца противоположны признакам, полученным из существующего метода; но это не проблема: согласно (свободно загружаемому) Введение в статистическое обучение, в соавторстве с Хасти и Тиббирани, стр. 382
Каждый вектор загрузки основного компонента уникален, вплоть до знака. Эта означает, что два разных пакета программного обеспечения приведут к тому же принципу компонентные векторы загрузки, хотя признаки тех нагрузочных векторов может отличаться. Знаки могут различаться, поскольку каждая загрузка основного компонента вектор задает направление в p-мерном пространстве: переворачивание знака не имеет эффект, так как направление не меняется. [...] Точно так же векторы оценки уникальны до знака, так как дисперсия Z совпадает с дисперсией -Z.
Наконец, теперь, когда у нас есть собственные значения, тривиально написать функцию для процента объясненной дисперсии:
def varianceExplained(df, k=1):
"""Calculate the fraction of variance explained by the top `k` eigenvectors.
Args:
df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
k: The number of principal components to consider.
Returns:
float: A number between 0 and 1 representing the percentage of variance explained
by the top `k` eigenvectors.
"""
components, scores, eigenvalues = pca(df, k)
return sum(eigenvalues[0:k])/sum(eigenvalues)
varianceExplained(df,1)
# 0.79439325322305299
В качестве теста мы также проверяем, объясняется ли в нашем примере дисперсия 1,0, при k = 5 (поскольку исходные данные 5-мерные):
varianceExplained(df,5)
# 1.0
Это должно выполнять вашу работу эффективно; не стесняйтесь придумывать какие-либо разъяснения, которые могут вам понадобиться.
[Разработано и протестировано с помощью Spark 1.5.0 и 1.5.1]