我有一个大的NumPy数组nodes = np.arange(100_000_000)
,我需要通过重新排列这个数组
- 记录然后删除数组中的中间值
- 将阵列拆分为
left
半部分和right
半部分 - 对每一半重复步骤1-2
- 当所有值用完时停止
因此,对于较小的输入示例nodes = np.arange(10)
,输出为:
[5 2 8 1 4 7 9 0 3 6]
这是通过天真地做:
import numpy as np
def split(node, out):
mid = len(node) // 2
out.append(node[mid])
return node[:mid], node[mid+1:]
def reorder(a):
nodes = [a.tolist()]
out = []
while nodes:
tmp = []
for node in nodes:
for n in split(node, out):
if n:
tmp.append(n)
nodes = tmp
return np.array(out)
if __name__ == "__main__":
nodes = np.arange(10)
print(reorder(nodes))
然而,这对于nodes = np.arange(100_000_000)
来说太慢了,所以我正在寻找一个更快的解决方案。
您可以通过处理切片组,用Numpy向量化函数。
这里有一个实现:
# Similar to [e for tmp in zip(a, b) for e in tmp] ,
# but on Numpy arrays and much faster
def interleave(a, b):
assert len(a) == len(b)
return np.column_stack((a, b)).reshape(len(a) * 2)
# n is the length of the input range (len(a) in your example)
def fast_reorder(n):
if n == 0:
return np.empty(0, dtype=np.int32)
startSlices = np.array([0], dtype=np.int32)
endSlices = np.array([n], dtype=np.int32)
allMidSlices = np.empty(n, dtype=np.int32) # Similar to "out" in your implementation
midInsertCount = 0 # Actual size of allMidSlices
# Generate a bunch of middle values as long as there is valid slices to split
while midInsertCount < n:
# Generate the new mid/left/right slices
midSlices = (endSlices + startSlices) // 2
# Computing the next slices is not needed for the last step
if midInsertCount + len(midSlices) < n:
# Generate the nexts slices (possibly with invalid ones)
newStartSlices = interleave(startSlices, midSlices+1)
newEndSlices = interleave(midSlices, endSlices)
# Discard invalid slices
isValidSlices = newStartSlices < newEndSlices
startSlices = newStartSlices[isValidSlices]
endSlices = newEndSlices[isValidSlices]
# Fast appending
allMidSlices[midInsertCount:midInsertCount+len(midSlices)] = midSlices
midInsertCount += len(midSlices)
return allMidSlices[0:midInsertCount]
在我的机器上,这比您的标量实现快89倍,输入np.arange(100_000_000)
从2min35下降到1.75s。它还消耗更少的内存(大约少3~4倍(。请注意,如果您想要更快的代码,那么您可能需要使用像C或C++这样的本地语言。
编辑:这个问题已经更新为有一个小得多的输入数组,所以出于历史原因,我留下下面的内容。基本上,这可能是一个拼写错误,但我们经常习惯于计算机处理超大的数字,当涉及内存时,它们可能会成为一个真正的问题。
其他人已经提交了一个基于numpy的解决方案,我认为它符合要求。
您的代码需要大量的RAM才能容纳1000亿个64位整数。你有800GB的RAM吗?然后将numpy数组转换为一个列表,该列表将大大大于数组(numpy数组中的每个压缩的64位int将成为一个内存效率低得多的python int对象,并且该列表将有一个指向该对象的指针(。然后,您对列表进行大量切片,这些切片不会复制数据,但会复制指向数据的指针,并使用更多的RAM。您还可以将所有结果值附加到一个列表中,每次一个值。一般来说,列表添加项目的速度非常快,但如果列表的大小如此之大,不仅速度会很慢,而且列表的分配方式可能会非常浪费RAM,并导致重大问题(我相信,当列表达到一定的满度时,它们的大小会翻倍,因此您最终会分配比您需要的更多的RAM,并进行许多分配和可能的复制(。你在什么机器上运行这个?有很多方法可以改进你的代码,但除非你在超级计算机上运行,否则我不知道你是否能完成计算。我只是。。只有有32GB的RAM,我甚至不会尝试创建100B的int_64 numpy阵列,因为我不想为大量的虚拟内存耗尽ssd写入寿命。
至于改进代码,请坚持使用numpy数组,不要更改为python列表,这将大大增加您所需的RAM。预先分配一个numpy数组来输入答案。然后你需要一个新的算法。任何递归或递归的东西(即分割输入的循环(都需要跟踪大量状态,你的节点列表将非常庞大,并且再次使用大量RAM。您可以使用len(a(来指示从列表中删除的值,并每次扫描整个阵列以确定下一步该做什么,但这将节省RAM,有利于搜索巨大的阵列。我觉得有一种算法可以从每一端截取数字,并将其放在输出中,只跟踪开始和结束,但我还没有弄清楚,至少还没有。
我还认为有一种更简单的算法,你只需跟踪你所做的分割次数,而不是制作一个巨大的切片列表并将其全部保存在内存中。取左半部分的中间,然后取右半部分的中部,然后向上数一,当你取左半一半的中部时,你知道你必须跳到右半部分,然后数为一,所以你跳到原来右半部分的左半部分,一直跳……根据半部分的深度和输入的长度,你应该可以在不扫描或追踪所有这些切片,尽管我没能花太多时间在脑子里思考这个问题。
对于这种性质的问题,如果你真的需要突破极限,你应该考虑使用C/C++,这样你就可以尽可能高效地使用RAM,因为你正在做大量的小事,这些小事与python的性能不太匹配。