我实现了代码来尝试在numpy数组中获得最大的出现次数。我很满意使用numba,但有局限性。我想知道是否可以将其改进为一般情况。
numba实施
import numba as nb
import numpy as np
import collections
@nb.njit("int64(int64[:])")
def max_count_unique_num(x):
"""
Counts maximum number of unique integer in x.
Args:
x (numpy array): Integer array.
Returns:
Int
"""
# get maximum value
m = x[0]
for v in x:
if v > m:
m = v
if m == 0:
return x.size
# count each unique value
num = np.zeros(m + 1, dtype=x.dtype)
for k in x:
num[k] += 1
# maximum count
m = 0
for k in num:
if k > m:
m = k
return m
为了比较,我还实现了numpy的unique
和collections.Counter
def np_unique(x):
""" Counts maximum occurrence using numpy's unique. """
ux, uc = np.unique(x, return_counts=True)
return uc.max()
def counter(x):
""" Counts maximum occurrence using collections.Counter. """
counts = collections.Counter(x)
return max(counts.values())
时间
编辑:根据@MechanicPig的建议,添加np.bincount
进行额外的比较。
In [1]: x = np.random.randint(0, 2000, size=30000).astype(np.int64)
In [2]: %timeit max_count_unique_num(x)
30 µs ± 387 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [3]: %timeit np_unique(x)
1.14 ms ± 1.65 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [4]: %timeit counter(x)
2.68 ms ± 33.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [5]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [6]: %timeit counter(x)
3.07 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [7]: %timeit np_unique(x)
1.3 ms ± 7.35 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [8]: %timeit max_count_unique_num(x)
490 µs ± 1.47 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [9]: x = np.random.randint(0, 2000, size=30000).astype(np.int64)
In [10]: %timeit np.bincount(x).max()
32.3 µs ± 250 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [11]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [12]: %timeit np.bincount(x).max()
830 µs ± 6.09 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
numba实现的局限性效率仅在x
均为小正int
时才显著降低,当int
非常大时效率显著降低;不适用float
及负值.
我有什么办法可以推广实现并保持速度?
更新
检查np.unique
的源代码后,一般情况下的实现可以是:
@nb.njit(["int64(int64[:])", "int64(float64[:])"])
def max_count_unique_num_2(x):
x.sort()
n = 0
k = 0
x0 = x[0]
for v in x:
if x0 == v:
k += 1
else:
if k > n:
n = k
k = 1
x0 = v
# for last item in x if it equals to previous one
if k > n:
n = k
return n
timeit
In [154]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [155]: %timeit max_count_unique_num(x)
519 µs ± 5.33 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [156]: %timeit np_unique(x)
1.3 ms ± 9.88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [157]: %timeit max_count_unique_num_2(x)
240 µs ± 1.92 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [158]: x = np.random.randint(0, 200000, size=300000).astype(np.int64)
In [159]: %timeit max_count_unique_num(x)
1.01 ms ± 7.2 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [160]: %timeit np_unique(x)
18.1 ms ± 395 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [161]: %timeit max_count_unique_num_2(x)
3.58 ms ± 28.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
:
- 如果
x
为大整数且大小不大,则max_count_unique_num_2
优于max_count_unique_num
。 max_count_unique_num
和max_count_unique_num_2
均明显快于np.unique
。max_count_unique_num_2
的小修改可以返回出现次数最多的项目,甚至所有项目都有相同的最大出现次数。max_count_unique_num_2
甚至可以通过删除x.sort()
来加速x
本身的排序。
如果缩短代码:
@nb.njit("int64(int64[:])", fastmath=True)
def shortened(x):
num = np.zeros(x.max() + 1, dtype=x.dtype)
for k in x:
num[k] += 1
return num.max()
或平行的:
@nb.njit("int64(int64[:])", parallel=True, fastmath=True)
def shortened_paralleled(x):
num = np.zeros(x.max() + 1, dtype=x.dtype)
for k in nb.prange(x.size):
num[x[k]] += 1
return num.max()
并行化将优于更大的数据量。请注意,并行在某些运行中会得到不同的结果,如果可能的话,需要修复。
使用Numba处理浮点数(或负值):
@nb.njit("int8(float64[:])", fastmath=True)
def shortened_float(x):
num = np.zeros(x.size, dtype=np.int8)
for k in x:
for j in range(x.shape[0]):
if k == x[j]:
num[j] += 1
return num.max()
IMO,np.unique(x, return_counts=True)[1].max()
是处理整数和浮点数的最佳选择,在非常快的实现中。Numba处理整数的速度更快(这取决于数据大小,因为数据大小越大,性能越弱;AIK,这是由于循环本能而不是数组),但对于浮点数,代码必须在性能方面进行优化;但是我不认为Numba可以打败NumPy unique
,特别是当我们面对大数据时.
注意:np.bincount
只能处理整数.
您也可以不使用numpy。
arr = [1,1,2,2,3,3,4,5,6,1,3,5,7,1]
counts = list(map(list(arr).count, set(arr)))
list(set(arr))[counts.index(max(counts))]
如果你想使用numpy,那么试试这个,
arr = np.array([1,1,2,2,3,3,4,5,6,1,3,5,7,1])
uniques, counts = np.unique(arr, return_counts = True)
uniques[np.where(counts == counts.max())]
两者做完全相同的工作。要检查哪种方法更有效,只需执行以下操作:
time_i = time.time()
<arr declaration> # Creating a new array each iteration can cause the total time to increase which would be biased against the numpy method.
for i in range(10**5):
<method you want>
time_f = time.time()
当我运行这个时,第一种方法得到0.39秒,第二种方法得到2.69秒。因此,可以肯定地说,第一种方法更有效。
我想说的是,你的实现几乎与numpy.bincount
相同。如果您想让它通用,您可以考虑对原始数据进行编码:
def encode(ar):
# Equivalent to numpy.unique(ar, return_inverse=True)[1] when ar.ndim == 1
flatten = ar.ravel()
perm = flatten.argsort()
sort = flatten[perm]
mask = np.concatenate(([False], sort[1:] != sort[:-1]))
encoded = np.empty(sort.shape, np.int64)
encoded[perm] = mask.cumsum()
encoded.shape = ar.shape
return encoded
def count_max(ar):
return max_count_unique_num(encode(ar))