稀疏 CSR 数组的核外处理



如何使用Python在磁盘上保存的稀疏CSR数组的块上并行应用某些函数?可以按顺序完成此操作,例如,通过保存 CSR 数组,joblib.dumpjoblib.load(.., mmap_mode="r")打开它并逐个处理行块。 使用 dask 可以更有效地完成此操作吗?

特别是,假设不需要对稀疏数组进行所有可能的核心操作,而只需要并行加载行块(每个块都是一个 CSR 数组(并对其应用一些功能(在我的情况下,例如estimator.predict(X)来自scikit-learn(。

此外,磁盘上是否有适合此任务的文件格式? Joblib 可以工作,但我不确定作为内存映射加载的 CSR 数组的(并行(性能;spark.mllib似乎使用了一些自定义的稀疏存储格式(似乎没有纯 Python 解析器(或 LIBSVM 格式(scikit-learn 中的解析器,根据我的经验,比joblib.dump慢得多(......

注意:我已经阅读了文档,https://github.com/dask/dask/上有关它的各种问题,但我仍然不确定如何最好地解决这个问题。

编辑:为了给出一个更实际的例子,下面是在 dask 中适用于密集数组的代码,但在使用稀疏数组时失败并出现此错误,

import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2:按照下面的讨论,下面的示例克服了上一个错误,但得到了关于dask/array/core.py:L3413IndexError: tuple index out of range的错误,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing
fh = joblib.load('X_csr.pkl', mmap_mode='r')
def func(x):
if x.ndim == 0:
# dask does some heuristics with dummy data, if the x is a 0d array
# the sum command would fail
return x
res = np.asarray(x.sum(axis=1, keepdims=True))
return res
Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()

所以我对joblib或dask一无所知,更不用说你的应用程序特定的数据格式了。但实际上可以分块从磁盘读取稀疏矩阵,同时保留稀疏数据结构。

虽然维基百科关于CSR格式的文章很好地解释了它是如何工作的,但我会做一个简短的回顾:

一些稀疏矩阵,例如:

1 0 2
0 0 3
4 5 6

通过记住每个非零值及其所在的列来存储:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)

现在我们仍然缺少行。压缩格式仅存储每行中有多少个非零值,而不是存储每个值行。

请注意,非零计数也是累积的,因此以下数组包含直到并包括此行的非零值的数量。更复杂的是,数组总是以0开头,因此包含num_rows+1条目:

sparse.indptr = 0 2 3 6

所以直到并包括第二行有 3 个非零值,即123

既然我们解决了这个问题,我们就可以开始"切片"矩阵了。目标是为一些块构建dataindicesindptr数组。假设原始的巨大矩阵存储在三个二进制文件中,我们将逐步读取。我们使用生成器来重复yield一些块。

为此,我们需要知道每个块中有多少个非零值,并读取相应的值和列索引数量。非零计数可以方便地从 indptr 数组中读取。这是通过从与所需块大小相对应的巨大indptr文件中读取一定数量的条目来实现的。indptr文件该部分的最后一个条目减去之前非零值的数量,给出了该块中的非零值的数量。因此,块dataindices数组只是从大dataindices文件中切片的。indptr数组需要人为地加上零(这就是格式想要的,不要问我:D(。

然后我们可以构造一个data块、indicesindptr的稀疏矩阵,得到一个新的稀疏矩阵。

必须注意的是,实际的矩阵大小不能直接从三个数组中重建。它要么是矩阵的最大列索引,要么是运气不好并且块中没有未确定的数据。所以我们还需要传递列计数。

我可能以一种相当复杂的方式解释了事情,所以只需将其作为不透明的代码段阅读,它实现了这样一个生成器:

import numpy as np
import scipy.sparse

def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
sparse_indptr_path, dtype=np.float32, column_size=None):
data_item_size = dtype().itemsize
with open(sparse_data_path, 'rb') as data_file, 
open(sparse_indices_path, 'rb') as indices_file, 
open(sparse_indptr_path, 'rb') as indptr_file:
nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)
while True:
indptr_batch = np.frombuffer(nnz_before.tobytes() +
indptr_file.read(4*batch_size), dtype=np.int32)
if len(indptr_batch) == 1:
break
batch_indptr = indptr_batch - nnz_before
nnz_before = indptr_batch[-1]
batch_nnz = np.asscalar(batch_indptr[-1])
batch_data = np.frombuffer(data_file.read(
data_item_size * batch_nnz), dtype=dtype)
batch_indices = np.frombuffer(indices_file.read(
4 * batch_nnz), dtype=np.int32)
dimensions = (len(indptr_batch)-1, column_size)
matrix = scipy.sparse.csr_matrix((batch_data, 
batch_indices, batch_indptr), shape=dimensions)
yield matrix

if __name__ == '__main__':
sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)
sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
sparse.indices.tofile('sparse.indices')  # dtype=int32
sparse.indptr.tofile('sparse.indptr')    # dtype=int32
print(sparse.toarray())
print('========')
for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
'sparse.indptr', column_size=4):
print(batch.toarray())

numpy.ndarray.tofile()只存储二进制数组,所以你需要记住数据格式。scipy.sparse表示indicesindptrint32,所以这是总矩阵大小的限制。

我还对代码进行了基准测试,发现 scipy csr 矩阵构造函数是小矩阵的瓶颈。您的里程可能会有所不同,这只是"原理证明"。

如果需要更复杂的实现,或者有些东西太晦涩难懂,请打我:)

最新更新