Внекорневая обработка редких массивов CSR

Как можно применить некоторую функцию параллельно на кусках разреженного массива CSR, сохраненного на диске с помощью Python? Последовательно это можно сделать, например, сохраняя массив CSR с joblib.dump, открывая его с помощью joblib.load(.., mmap_mode="r") и обрабатывая куски строк один за другим. Это можно сделать более эффективно с помощью dask?

В частности, если предположить, что для всех разрешенных основных операций на разреженных массивах не требуется всевозможных операций, а просто возможность параллельно загружать куски строк (каждый фрагмент представляет собой массив CSR) и применять к ним некоторую функцию (в моем случай, например, estimator.predict(X) из scikit-learn).

Кроме того, есть ли формат файла на диске, который подходит для этой задачи? joblib работает, но я не уверен в (параллельной) производительности массивов CSR, загружаемых в карты памяти; Кажется, что spark.mllib использует либо некоторый пользовательский разреженный формат хранения (который, кажется, не имеет чистого анализатора Python), либо формат LIBSVM (парсер в scikit-learn, по моему опыту, намного медленнее, чем joblib.dump)..

Примечание. Я прочитал http://dask.pydata.org/en/latest/array-sparse.html, различные вопросы об этом на https://github.com/dask/dask/, но я все еще не уверен, как наилучшим образом подойти к этой проблеме.

Изменить:, чтобы дать более практичный пример, ниже приведен код, который работает в dask для плотных массивов, но не работает при использовании разреженных массивов с эта ошибка,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2:, следуя приведенному ниже обсуждению, приведенный ниже пример преодолевает предыдущую ошибку, но получает около IndexError: tuple index out of range в dask/array/core.py:L3413,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()

Ответы