Внекорневая обработка редких массивов CSR
Как можно применить некоторую функцию параллельно на кусках разреженного массива CSR, сохраненного на диске с помощью Python? Последовательно это можно сделать, например, сохраняя массив CSR с joblib.dump
, открывая его с помощью joblib.load(.., mmap_mode="r")
и обрабатывая куски строк один за другим. Это можно сделать более эффективно с помощью dask?
В частности, если предположить, что для всех разрешенных основных операций на разреженных массивах не требуется всевозможных операций, а просто возможность параллельно загружать куски строк (каждый фрагмент представляет собой массив CSR) и применять к ним некоторую функцию (в моем случай, например, estimator.predict(X)
из scikit-learn).
Кроме того, есть ли формат файла на диске, который подходит для этой задачи? joblib работает, но я не уверен в (параллельной) производительности массивов CSR, загружаемых в карты памяти; Кажется, что spark.mllib использует либо некоторый пользовательский разреженный формат хранения (который, кажется, не имеет чистого анализатора Python), либо формат LIBSVM (парсер в scikit-learn, по моему опыту, намного медленнее, чем joblib.dump
)..
Примечание. Я прочитал http://dask.pydata.org/en/latest/array-sparse.html, различные вопросы об этом на https://github.com/dask/dask/, но я все еще не уверен, как наилучшим образом подойти к этой проблеме.
Изменить:, чтобы дать более практичный пример, ниже приведен код, который работает в dask для плотных массивов, но не работает при использовании разреженных массивов с эта ошибка,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()
Edit2:, следуя приведенному ниже обсуждению, приведенный ниже пример преодолевает предыдущую ошибку, но получает около IndexError: tuple index out of range
в dask/array/core.py:L3413
,
import dask
# +imports from the example above
dask.set_options(get=dask.get) # disable multiprocessing
fh = joblib.load('X_csr.pkl', mmap_mode='r')
def func(x):
if x.ndim == 0:
# dask does some heuristics with dummy data, if the x is a 0d array
# the sum command would fail
return x
res = np.asarray(x.sum(axis=1, keepdims=True))
return res
Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()