Python:获取数组中出现次数最多的值



我实现了代码来尝试在numpy数组中获得最大的出现次数。我很满意使用numba,但有局限性。我想知道是否可以将其改进为一般情况。

numba实施

import numba as nb
import numpy as np
import collections
@nb.njit("int64(int64[:])")
def max_count_unique_num(x):
"""
Counts maximum number of unique integer in x.
Args:
x (numpy array): Integer array.
Returns:
Int
"""
# get maximum value
m = x[0]
for v in x:
if v > m:
m = v
if m == 0:
return x.size
# count each unique value
num = np.zeros(m + 1, dtype=x.dtype)
for k in x:
num[k] += 1
# maximum count
m = 0
for k in num:
if k > m:
m = k
return m

为了比较,我还实现了numpy的uniquecollections.Counter

def np_unique(x):
""" Counts maximum occurrence using numpy's unique. """
ux, uc = np.unique(x, return_counts=True)
return uc.max()

def counter(x):
""" Counts maximum occurrence using collections.Counter. """
counts = collections.Counter(x)
return max(counts.values())

时间

编辑:根据@MechanicPig的建议,添加np.bincount进行额外的比较。

In [1]: x = np.random.randint(0, 2000, size=30000).astype(np.int64)
In [2]: %timeit max_count_unique_num(x)
30 µs ± 387 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [3]: %timeit np_unique(x)
1.14 ms ± 1.65 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [4]: %timeit counter(x)
2.68 ms ± 33.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [5]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [6]: %timeit counter(x)
3.07 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [7]: %timeit np_unique(x)
1.3 ms ± 7.35 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [8]: %timeit max_count_unique_num(x)
490 µs ± 1.47 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [9]: x = np.random.randint(0, 2000, size=30000).astype(np.int64)
In [10]: %timeit np.bincount(x).max()
32.3 µs ± 250 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
In [11]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [12]: %timeit np.bincount(x).max()
830 µs ± 6.09 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

numba实现的局限性效率仅在x均为小正int时才显著降低,当int非常大时效率显著降低;不适用float及负值.

我有什么办法可以推广实现并保持速度?


更新
检查np.unique的源代码后,一般情况下的实现可以是:

@nb.njit(["int64(int64[:])", "int64(float64[:])"])
def max_count_unique_num_2(x):
x.sort()
n = 0
k = 0
x0 = x[0]
for v in x:
if x0 == v:
k += 1
else:
if k > n:
n = k
k = 1
x0 = v
# for last item in x if it equals to previous one
if k > n:
n = k
return n

timeit

In [154]: x = np.random.randint(0, 200000, size=30000).astype(np.int64)
In [155]: %timeit max_count_unique_num(x)
519 µs ± 5.33 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [156]: %timeit np_unique(x)
1.3 ms ± 9.88 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [157]: %timeit max_count_unique_num_2(x)
240 µs ± 1.92 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [158]: x = np.random.randint(0, 200000, size=300000).astype(np.int64)
In [159]: %timeit max_count_unique_num(x)
1.01 ms ± 7.2 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
In [160]: %timeit np_unique(x)
18.1 ms ± 395 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [161]: %timeit max_count_unique_num_2(x)
3.58 ms ± 28.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

:

  1. 如果x为大整数且大小不大,则max_count_unique_num_2优于max_count_unique_num
  2. max_count_unique_nummax_count_unique_num_2均明显快于np.unique
  3. max_count_unique_num_2的小修改可以返回出现次数最多的项目,甚至所有项目都有相同的最大出现次数。
  4. max_count_unique_num_2甚至可以通过删除x.sort()来加速x本身的排序。

如果缩短代码:

@nb.njit("int64(int64[:])", fastmath=True)
def shortened(x):
num = np.zeros(x.max() + 1, dtype=x.dtype)
for k in x:
num[k] += 1
return num.max()

或平行的:

@nb.njit("int64(int64[:])", parallel=True, fastmath=True)
def shortened_paralleled(x):
num = np.zeros(x.max() + 1, dtype=x.dtype)
for k in nb.prange(x.size):
num[x[k]] += 1
return num.max()

并行化将优于更大的数据量。请注意,并行在某些运行中会得到不同的结果,如果可能的话,需要修复。

使用Numba处理浮点数(或负值):

@nb.njit("int8(float64[:])", fastmath=True)
def shortened_float(x):
num = np.zeros(x.size, dtype=np.int8)
for k in x:
for j in range(x.shape[0]):
if k == x[j]:
num[j] += 1
return num.max()

IMO,np.unique(x, return_counts=True)[1].max()是处理整数和浮点数的最佳选择,在非常快的实现中。Numba处理整数的速度更快(这取决于数据大小,因为数据大小越大,性能越弱;AIK,这是由于循环本能而不是数组),但对于浮点数,代码必须在性能方面进行优化;但是我不认为Numba可以打败NumPy unique特别是当我们面对大数据.

注意:np.bincount只能处理整数.

您也可以不使用numpy。

arr = [1,1,2,2,3,3,4,5,6,1,3,5,7,1]
counts = list(map(list(arr).count, set(arr)))
list(set(arr))[counts.index(max(counts))]

如果你想使用numpy,那么试试这个,

arr = np.array([1,1,2,2,3,3,4,5,6,1,3,5,7,1])
uniques, counts = np.unique(arr, return_counts = True)
uniques[np.where(counts == counts.max())]

两者做完全相同的工作。要检查哪种方法更有效,只需执行以下操作:

time_i = time.time()
<arr declaration> # Creating a new array each iteration can cause the total time to increase which would be biased against the numpy method. 
for i in range(10**5):
<method you want>
time_f = time.time()

当我运行这个时,第一种方法得到0.39秒,第二种方法得到2.69秒。因此,可以肯定地说,第一种方法更有效。

我想说的是,你的实现几乎与numpy.bincount相同。如果您想让它通用,您可以考虑对原始数据进行编码:

def encode(ar):
# Equivalent to numpy.unique(ar, return_inverse=True)[1] when ar.ndim == 1
flatten = ar.ravel()
perm = flatten.argsort()
sort = flatten[perm]
mask = np.concatenate(([False], sort[1:] != sort[:-1]))
encoded = np.empty(sort.shape, np.int64)
encoded[perm] = mask.cumsum()
encoded.shape = ar.shape
return encoded
def count_max(ar):
return max_count_unique_num(encode(ar))

相关内容

  • 没有找到相关文章

最新更新