从两个数组中获取相同数量的元素,使得获取的值具有尽可能少的重复项



考虑我们有两个大小为N的数组,它们的值在[0, N-1]的范围内。例如:

a = np.array([0, 1, 2, 0])
b = np.array([2, 0, 3, 3])

我需要生成一个新的数组c,它分别包含ab中的N/2个元素,即必须从两个父数组中均匀/相等地取值。

(对于奇数长度,这将是(N-1)/2(N+1)/2。也可以忽略奇数长度的情况,不重要)。

从两个数组中取相同数量的元素是非常简单的,但是有一个额外的约束:c应该有尽可能多的唯一数字/尽可能少的重复数字。

例如,上面ab的解决方案是:

c = np.array([b[0], a[1], b[2], a[3]])
>>> c
array([2, 1, 3, 0])

注意位置/顺序是保留的。组成cab的每个元素都在相同的位置上。如果c中的i元素来自a,c[i] == a[i],则b也一样。


一个直接的解决方案是简单的路径遍历,很容易实现递归:

def traverse(i, a, b, path, n_a, n_b, best, best_path):
if n_a == 0 and n_b == 0:
score = len(set(path))
return (score, path.copy()) if score > best else (best, best_path)
if n_a > 0:
path.append(a[i])
best, best_path = traverse(i + 1, a, b, path, n_a - 1, n_b, best, best_path)
path.pop()

if n_b > 0:
path.append(b[i])
best, best_path = traverse(i + 1, a, b, path, n_a, n_b - 1, best, best_path)
path.pop()
return best, best_path

这里的n_an_b是我们分别从ab中获取的值,22是我们想要平均获取4项的值。

>>> score, best_path = traverse(0, a, b, [], 2, 2, 0, None)
>>> score, best_path
(4, [2, 1, 3, 0])

是否有一种方法以更矢量化/更有效的方式实现上述,可能通过numpy?

算法速度慢主要是因为它以指数方式运行时间。由于递归,无法直接使用Numpy对该算法进行矢量化。。即使有可能,大量的组合也会导致大多数Numpy实现效率低下(由于需要计算大量Numpy数组)。此外,没有向量化操作来有效地计算许多行的唯一值的数量(通常的方法是使用np.unique,在这种情况下效率不高,并且不能在没有循环的情况下使用)。因此,有两种可能的策略来加快此速度:

  • 试图找到一个算法具有合理的复杂性(例如;<= O(n^4));
  • 使用编译方法,微优化和技巧来编写更快的暴力破解实现。

由于找到正确的次指数算法并不容易,我选择了另一种方法(尽管第一种方法是最好的)。

这个想法是:

  • 通过在整数上循环迭代生成所有可能的解来消除递归;
  • 编写一个快速的方法来计算数组的唯一项;
  • 使用Numba JIT编译器来优化只有一次编译有效的代码。

最后的代码:

import numpy as np
import numba as nb
# Naive way to count unique items.
# This is a slow fallback implementation.
@nb.njit
def naive_count_unique(arr):
count = 0
for i in range(len(arr)):
val = arr[i]
found = False
for j in range(i):
if arr[j] == val:
found = True
break
if not found:
count += 1
return count
# Optimized way to count unique items on small arrays.
# Count items 2 by 2.
# Fast on small arrays.
@nb.njit
def optim_count_unique(arr):
count = 0
for i in range(0, len(arr), 2):
if arr[i] == arr[i+1]:
tmp = 1
for j in range(i):
if arr[j] == arr[i]: tmp = 0
count += tmp
else:
val1, val2 = arr[i], arr[i+1]
tmp1, tmp2 = 1, 1
for j in range(i):
val = arr[j]
if val == val1: tmp1 = 0
if val == val2: tmp2 = 0
count += tmp1 + tmp2
return count
@nb.njit
def count_unique(arr):
if len(arr) % 2 == 0:
return optim_count_unique(arr)
else:
# Odd case: not optimized yet
return naive_count_unique(arr)
# Count the number of bits in a 32-bit integer
# See https://stackoverflow.com/questions/71097470/msb-lsb-popcount-in-numba
@nb.njit('int_(uint32)', inline='always')
def popcount(v):
v = v - ((v >> 1) & 0x55555555)
v = (v & 0x33333333) + ((v >> 2) & 0x33333333)
c = np.uint32((v + (v >> 4) & 0xF0F0F0F) * 0x1010101) >> 24
return c
# Count the number of bits in a 64-bit integer
@nb.njit(inline='always')
def bit_count(n):
if n < (1 << 30):
return popcount(np.uint32(n))
else:
return popcount(np.uint32(n)) + popcount(np.uint32(n >> 32))
# Mutate `out` so not to create an expensive new temporary array
@nb.njit
def int_to_path(n, out, a, b):
for i in range(len(out)):
out[i] = a[i] if ((n >> i) & 1) else b[i]
@nb.njit(['(int32[:], int32[:], int64, int64)', '(int64[:], int64[:], int64, int64)'])
def traverse_fast(a, b, n_a, n_b):
# This assertion is needed because the paths are encoded using 64-bit.
# This should not be a problem in practice since the number of solutions to
# test would be impracticably huge to test using this algorithm anyway.
assert n_a + n_b < 62
max_iter = 1 << (n_a + n_b)
path = np.empty(n_a + n_b, dtype=a.dtype)
score, best_score, best_i = 0, 0, 0
# Iterate over all cases (more than the set of possible solution)
for i in range(max_iter):
# Filter the possible solutions
if bit_count(i) != n_b:
continue
# Analyse the score of the solution
int_to_path(i, path, a, b)
score = count_unique(path)
# Store it if it better than the previous one
if score > best_score:
best_score = score
best_i = i
int_to_path(best_i, path, a, b)
return best_score, path

这个实现大约快30倍在我的机器上大小为8的数组上。我们可以使用几个内核来进一步加快速度。然而,我认为最好集中精力寻找一个次指数实现,以避免浪费更多的计算资源。请注意,路径与初始函数不同,但在随机数组上的分数是相同的。它可以帮助其他人在更大的数组上测试他们的实现,而不用等待很长时间。

仔细测试一下。

import numpy as np
from numpy.random._generator import default_rng

rand = default_rng(seed=1)
n = 16
a = rand.integers(low=0, high=n, size=n)
b = rand.integers(low=0, high=n, size=n)
uniques = np.setxor1d(a, b)
print(a)
print(b)
print(uniques)

def limited_uniques(arr: np.ndarray) -> np.ndarray:
choose = np.zeros(shape=n, dtype=bool)
_, idx, _ = np.intersect1d(arr, uniques, return_indices=True)
idx = idx[:n//2]
choose[idx] = True
n_missing = n//2 - len(idx)
counts = choose.cumsum()
diffs = np.arange(n) - counts
at = np.searchsorted(diffs, n_missing)
choose[:at] = True
return arr[choose]

a_half = limited_uniques(a)
uniques = np.union1d(uniques, np.setdiff1d(a, a_half))
interleaved = np.empty_like(a)
interleaved[0::2] = a_half
interleaved[1::2] = limited_uniques(b)
print(interleaved)
[ 7  8 12 15  0  2 13 15  3  4 13  6  4 13  4  6]
[10  8  1  0 13 12 13  8 13  5  7 12  1  4  1  7]
[ 1  2  3  5  6 10 15]
[ 7 10  8  8 12  1 15  0  0 13  2 12  3  5  6  4]

最新更新