下面的代码需要 12 秒才能num_iterations=1。
如何使其更快(包括使用多处理(?我希望能够为 num_iterations=1000 执行此操作。
由于列表结构的列表,我找不到一种明智的方法来使用多处理 - 这是必需的,因为每个箱中的元素数量是可变的。多处理/memmap显然不能很好地处理这种结构。
import time
import numpy as np
idx = np.random.randint(int(1e6),size=int(1e7))
output = [[] for i in range(int(1e6))]
num_iterations = 1
t1=time.time()
for i in range(num_iterations):
values = np.random.random(int(1e7))
time.sleep(5) # To account for the fact I need to load the data from disc.
for j,value in enumerate(values):
output[idx[j]].append(value)
print(time.time()-t1) # 12 seconds for num_iterations=1
需要注意的一件事是,每次将values
映射到output
时,为每个箱添加的条目数相同。因此,您可以预先分配具有正确大小的 numpy 数组,而不是附加到列表。
此外,您可以将每个箱的索引存储在数组中(每个箱一个数组(。由于您只需要为所有迭代创建一次这些索引数组,并且由于箱比值少得多,因此您可以使用以下方法从values
中提取出一个特定箱的所有新条目:
output_chunk = values[indices]
更快的是使用 numba(包含在 Anaconda Python 发行版中(进行即时 (jit( 编译。然后,您可以获得类似于 C 的性能,而无需编译和链接代码的所有麻烦。我不确定 numba 优化的所有细节,但我认为如果一个@njit
标记的功能只使用标量变量和 numpy 数组,而不是 Python 列表和字典,这是最有效的。
以下是三种实现的时间(对于比您的问题中更小的数组大小(:
%timeit -r2 -n5 f_reference(100)
486 ms ± 4.42 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)
%timeit -r2 -n50 f_vectorized(100)
73.6 ms ± 219 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)
%timeit -r2 -n50 f_numba(100)
9.6 ms ± 40.9 µs per loop (mean ± std. dev. of 2 runs, 50 loops each)
以下是实现:
import numpy as np
import numba
np.random.seed(1)
n_vals = 10000 # 1e7
n_bins = 1000# 1e6
idx = np.random.randint(n_bins, size=n_vals)
def f_reference(n_iter):
np.random.seed(2)
output = [[] for i in range(n_bins)]
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
for j,value in enumerate(values):
output[idx[j]].append(value)
return [np.array(x) for x in output]
def f_vectorized(n_iter):
np.random.seed(2)
# each entry in bin_indices is an int array of indices into values
# that belong in the corresponding bin.
bin_indices = [np.where(idx==i)[0] for i in range(n_bins)]
bin_sizes = [len(bi) for bi in bin_indices]
output = [np.zeros(bs*n_iter) for bs in bin_sizes]
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
for jbin, (indices, bsize) in enumerate(zip(bin_indices, bin_sizes)):
output_chunk = values[indices]
output[jbin][i*bsize:(i+1)*bsize] = output_chunk
return output
@numba.njit
def _f_numba_chunk(i_chunk, idx, values, bin_sizes, bin_offsets, output_1):
"""Process one set of values (length n_vals).
Update corresponding n_vals in output_1 array (length n_vals*n_iter).
"""
# pointers to next entry for each bin, shape (n_bins,)
j_bin_out = bin_offsets[:-1] + bin_sizes*i_chunk
for i_bin, val in zip(idx, values):
output_1[j_bin_out[i_bin]] = val
j_bin_out[i_bin] += 1
def f_numba(n_iter):
np.random.seed(2)
bin_sizes, _ = np.histogram(idx, np.arange(n_bins+1)-0.5)
bin_offsets = np.concatenate(([0], np.cumsum(bin_sizes)))*n_iter
output_1 = np.empty(n_vals*n_iter)
for i in range(n_iter):
values = np.random.random(n_vals) # simulate loading data
_f_numba_chunk(i, idx, values, bin_sizes, bin_offsets, output_1)
# convert output_1 to list of arrays
output = [
output_1[bin_offsets[i]:bin_offsets[i+1]]
for i in range(n_bins)
]
return output
# test
out_ref = f_reference(5)
out_vec = f_vectorized(5)
out_numba = f_numba(5)
for oref, ovec, onum in zip(out_ref, out_vec, out_numba):
assert np.all(ovec == oref)
assert np.all(onum == oref)
在系数 50 加速的情况下,也许不再需要并行化它,但循环for i in range(n_iter)
内的部分可以使用multiprocessing.Pool
并行化;每个工作线程返回output_chunk
,顶级进程只需要将块存储到output
中。