在 numpy 中高效分配箱



我有一个非常大的 1D python 数组x有些重复的数字,还有一些相同大小的数据d

x = np.array([48531, 62312, 23345, 62312, 1567, ..., 23345, 23345])
d = np.array([0    , 1    , 2    , 3    , 4   , ..., 99998, 99999])

在我的上下文中,"非常大"是指 10k...100k 个条目。其中一些是重复的,因此唯一条目的数量约为 5k......15k。

我想将它们分组到垃圾箱中。这应该通过创建两个对象来完成。一个是矩阵缓冲区,b从 d 获取的数据项。另一个对象是每个缓冲区列引用的唯一 x 值的向量v。下面是示例:

v =  [48531, 62312, 23345, 1567, ...]
b = [[0    , 1    , 2    , 4   , ...]
[X    , 3    , ....., ...., ...]
[ ...., ....., ....., ...., ...]
[X    , X    , 99998, X   , ...]
[X    , X    , 99999, X   , ...] ]

由于 x 中每个唯一数的出现次数不同,缓冲区 b 中的一些值是无效的(由大写X表示,即"不在乎")。


在 numpy 中导出 v 非常容易:

v, n = np.unique(x, return_counts=True)  # yay, just 5ms

我们甚至得到n这是 B 中每列内有效条目的数量。此外,(np.max(n), v.shape[0])返回需要分配的矩阵 b 的形状。

但是如何有效地生成b呢? for循环可能会有所帮助

b = np.zeros((np.max(n), v.shape[0]))
for i in range(v.shape[0]):
idx = np.flatnonzero(x == v[i])
b[0:n[i], i] = d[idx]

此循环遍历 b 的所有列,并通过标识x == v的所有位置来提取idx索引。

但是我不喜欢该解决方案,因为 for 循环相当慢(比唯一命令长约 50 倍)。我宁愿对操作进行矢量化。


因此,一种矢量化方法是创建一个索引矩阵,其中x == v,然后沿着列对其运行nonzero()命令。 但是,此矩阵需要 150K x 15K 范围内的内存,因此在 32 位系统上约为 8GB。

对我来说,np.unique-操作甚至可以有效地返回倒排索引,以便x = v[inv_indices],但无法获取 v 中每个箱的 v 到 x 分配列表,这听起来相当愚蠢。当函数通过 x 扫描时,这应该几乎是免费的。在实现方面,唯一的挑战是生成的索引矩阵的大小未知。


假设 np.unique-command 是用于装箱的方法,则解决此问题的另一种措辞方式:

给定三个数组x, v, inv_indices其中vx中的唯一元素,x = v[inv_indices]是否有一种有效的方法来生成索引向量v_to_x[i],以便i所有箱的all(v[i] == x[v_to_x[i]])

我不应该花更多的时间去研究np.unique-command本身。我很乐意为每个箱中的项目数量提供一个上限(例如 50)。

基于我编写此代码@user202729的建议

x_sorted_args = np.argsort(x)
x_sorted = x[x_sorted_args]
i = 0
v = -np.ones(T)
b = np.zeros((K, T))
for k,g in groupby(enumerate(x_sorted), lambda tup: tup[1]):
groups = np.array(list(g))[:,0]
size = groups.shape[0]
v[i] = k
b[0:size, i] = d[x_sorted_args[groups]]
i += 1

在大约 ~100ms 的运行中,这导致上面发布的原始代码的一些相当大的加速。

它首先枚举x中的值,添加相应的索引信息。然后枚举按实际x值分组,该值实际上是enumerate()生成的元组的第二个值。

for 循环遍历所有组,将元组的迭代器g转换为大小为(size x 2)groups矩阵,然后丢弃第二列,即仅保留索引的x值。这导致groups只是一个一维数组。

groupby()仅适用于排序数组。


干得好。我只是想知道我们是否可以做得更好?似乎仍然发生了很多不合理的数据复制。创建一个元组列表,然后将其转换为 2D 矩阵只是为了扔掉其中的一半仍然感觉有点次优。

我通过改写问题得到了我正在寻找的答案,请参阅此处: python:矢量化累积计数

通过"累积计数"返回的inv_indicesnp.unique()我们接收稀疏矩阵的数组索引,以便

c = cumcount(inv_indices)
b[inv_indices, c] = d

上面链接的线程中提出的累积计数非常有效。运行时间低于20ms是非常现实的。

最新更新