从具有重叠的池中选取无序组合



我有值池,我想通过从某些池中挑选来生成每个可能的无序组合。

例如,我想从池 0、池 0 和池 1 中挑选:

>>> pools = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
>>> part = (0, 0, 1)
>>> list(product(*(pools[i] for i in part)))
[(1, 1, 2), (1, 1, 3), (1, 1, 4), (1, 2, 2), (1, 2, 3), (1, 2, 4), (1, 3, 2), (1, 3, 3), (1, 3, 4), (2, 1, 2), (2, 1, 3), (2, 1, 4), (2, 2, 2), (2, 2, 3), (2, 2, 4), (2, 3, 2), (2, 3, 3), (2, 3, 4), (3, 1, 2), (3, 1, 3), (3, 1, 4), (3, 2, 2), (3, 2, 3), (3, 2, 4), (3, 3, 2), (3, 3, 3), (3, 3, 4)]

这将通过从池 0、池 0 和池 1 中选取来生成所有可能的组合。

然而,顺序对我来说并不重要,所以许多组合实际上是重复的。例如,由于我使用了笛卡尔乘积,因此会生成(1, 2, 4)(2, 1, 4)

我想出了一种简单的方法来缓解这个问题。对于从单个池中挑选的成员,我选择而不使用combinations_with_replacement订购。我数了数我想从每个池子里抽多少次。代码如下所示:

cnt = Counter()
for ind in part: cnt[ind] += 1
blocks = [combinations_with_replacement(pools[i], cnt[i]) for i in cnt]
return [list(chain(*combo)) for combo in product(*blocks)]

如果我碰巧多次从同一个池中选择,这将减少订购重复项。但是,所有池都有很多重叠,并且在合并的多个池上使用combinations_with_replacement会生成一些无效的组合。有没有更有效的方法来生成无序组合?

编辑:有关输入的额外信息:部件和池的数量很少(~5 和 ~20),为简单起见,每个元素都是一个整数。实际问题我已经解决了,所以这只是为了学术兴趣。假设每个池中有数千个数百个整数,但有些池很小,只有几十个。因此,某种结合或交叉似乎是要走的路。

与其说是"答案",不如说是促使我们更加努力地思考;-) 为了具体起见,我将 OP 的代码(略微重新拼写)包装在一个函数中,该函数也会清除重复项:

def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from itertools import chain, product
from collections import Counter
assert all(0 <= i < len(pools) for i in ixs)
seen = set()
cnt = Counter(ixs) # map index to count
blocks = [cwr(pools[i], count) for i, count in cnt.items()]
for t in product(*blocks):
t = tuple(sorted(chain(*t)))
if t not in seen:
seen.add(t)
yield t

我不怕在这里排序 - 它是内存高效的,对于小元组来说,可能比创建Counter对象所涉及的所有开销都快。

但无论如何,这里的重点是强调OP通过重新表述问题以使用combinations_with_replacement(cwr)而获得的真正价值。 请考虑以下输入:

N = 64
pools = [[0, 1]]
ixs = [0] * N

只有 65 个唯一结果,函数会立即生成它们,根本没有内部重复项。 另一方面,本质上相同

pools = [[0, 1]] * N
ixs = range(N)

也有相同的 65 个唯一结果,但基本上永远运行(例如,到目前为止给出的其他答案),通过 2**64 种可能性。cwr在这里没有帮助,因为每个池索引只出现一次。

因此,与任何"仅"从完整的笛卡尔产品中剔除重复项的解决方案相比,都有天文数字的改进空间,其中一些可以通过做OP已经做的事情来赢得。

在我看来,最有希望的方法是编写一个自定义生成器(而不是主要依赖于itertools函数的生成器),该生成器首先按字典顺序生成所有可能性(因此,通过构造,一开始不会创建重复项)。 但这需要首先对输入池进行一些"全局"分析,而我开始的代码很快就变得复杂得多,我现在无法抽出时间来处理。

一个基于@user2357112的答案

cwr与@user2357112的增量重复数据消除相结合,可以得到一个简短的算法,该算法可以在我拥有的所有测试用例中快速运行。 例如,对于上面[0, 1] ** 64示例的两个拼写,它基本上是即时的,并且在 Wood@Joseph 的答案末尾运行示例的速度与他说他的C++代码运行的速度大致相同(在Python 3.7.0下在我的盒子上为0.35秒,是的,找到了162295结果):

def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from collections import Counter
assert all(0 <= i < len(pools) for i in ixs)
result = {()}
for i, count in Counter(ixs).items():
result = {tuple(sorted(old + new))
for new in cwr(pools[i], count)
for old in result}
return result

为了让其他 Pythonistas 更容易尝试最后一个示例,以下是可执行 Python 的输入:

pools = [[1, 10, 14, 6],
[7, 2, 4, 8, 3, 11, 12],
[11, 3, 13, 4, 15, 8, 6, 5],
[10, 1, 3, 2, 9, 5,  7],
[1, 5, 10, 3, 8, 14],
[15, 3, 7, 10, 4, 5, 8, 6],
[14, 9, 11, 15],
[7, 6, 13, 14, 10, 11, 9, 4]]
ixs = range(len(pools))

然而,OP后来补充说,他们通常有大约20个池,每个池都有数千个元素。 1000**20 = 1e60 对于任何构建完整笛卡尔积的方法来说都是不切实际的,无论它多么巧妙地剔除重复项。 不过,他们希望有多少是重复的,这一点仍然很清楚,这种"增量重复数据消除"是否足够好,是否足够实用。

理想情况下,我们有一个生成器,一次产生一个结果,按字典顺序。

一次一个的懒惰词典生成

在增量重复数据消除的基础上,假设我们有一个严格递增(字典编纂)的排序元组序列,将相同的元组T附加到每个元组,然后再次对每个元组进行排序。 然后派生序列仍然严格按递增顺序排列。 例如,在左列中,我们有来自range(4)的 10 个唯一对,在右列中,我们将 2 附加到(并再次排序)每个对后会发生什么:

00 002
01 012
02 022
03 023
11 112
12 122
13 123
22 222
23 223
33 233

它们按排序顺序开始,派生的三元组也按排序顺序开始。 我将跳过简单的证明(草图:如果t1t2是相邻的元组,t1 < t2,并让i成为最小的索引,这样t1[i] != t2[i]。 然后t1[i] < t2[i]("词典编纂<"的意思)。 然后,如果您将x扔到两个元组中,请按案例继续:x <= t1[i]? 在t1[i]t2[i]之间? 是x >= t2[i]吗? 在每种情况下,很容易看出第一个派生元组严格小于第二个派生元组。

因此,假设我们有一个排序序列result来自一定数量的池的所有唯一排序元组,当我们将新池P元素添加到元组中时会发生什么? 好吧,如上所述,

[tuple(sorted(old + (P[0],))) for old in result]

也被排序,也是如此

[tuple(sorted(old + (P[i],))) for old in result]

对于range(len(P))的所有i。 这些保证已经排序的序列可以通过heapq.merge()合并,另一个生成器(killdups()下面的)在合并结果上运行,以动态清除重复项。 例如,没有必要保留到目前为止看到的所有元组的集合。 由于合并的输出不会递减,因此只需检查下一个结果是否与最后一个结果输出相同就足够了。

让这个懒惰地工作是微妙的。 到目前为止的结果序列必须由要添加的新池的每个元素访问,但我们不想一口气实现整个事情。 相反,itertools.tee()允许下一个池的每个元素按照自己的节奏遍历到目前为止的结果序列,并在所有新池元素完成每个结果项后自动释放每个结果项的内存。

需要函数build1()(或一些类似工作)来确保在正确的时间访问正确的值。 例如,如果build1()的主体被内联粘贴到调用它的位置,则代码将严重失败(主体将访问绑定到rcopynew的最终值,而不是在创建生成器表达式时绑定到的值)。

总而言之,当然,由于延迟的生成器调用和堆合并层,这有点慢。 作为回报,它按字典顺序返回结果,可以非常快速地开始交付结果,并且如果没有其他原因,除了最终结果序列根本没有实现之外,具有较低的峰值内存负担(在调用方迭代返回的生成器之前几乎不做任何事情)。

技术说明:不要害怕sorted()在这里。 追加是通过old + new完成的,原因如下:old已经排序,new通常是 1 元组。 在这种情况下,Python 的排序是线性时间,而不是O(N log N)

def gen(pools, ixs):
from itertools import combinations_with_replacement as cwr
from itertools import tee
from collections import Counter
from heapq import merge
def killdups(xs):
last = None
for x in xs:
if x != last:
yield x
last = x
def build1(rcopy, new):
return (tuple(sorted(old + new)) for old in rcopy)
assert all(0 <= i < len(pools) for i in ixs)
result = [()]
for i, count in Counter(ixs).items():
poolelts = list(cwr(pools[i], count))
xs = [build1(rcopy, new)
for rcopy, new in zip(tee(result, len(poolelts)),
poolelts)]
result = killdups(merge(*xs))
return result

2 个输入

事实证明,对于 2 输入情况,有一种从集合代数派生出来的简单方法。 如果xy相同,那么答案cwr(x, 2)。 如果xy不相交,product(x, y). 否则xy的交集c是非空的,答案是从 3 个成对不相交集合cx-cy-c得到的 4 个交叉乘积的串联:cwr(c, 2)product(x-c, c)product(y-c, c)product(x-c, y-c)。 证明很简单但很乏味,所以我会跳过它。 例如,cwr(c, 2)product(x-c, c)之间没有重复,因为后者中的每个元组都包含来自x-c的元素,但前者中的每个元组只包含来自c的元素,并且x-cc通过构造是不相交的。product(x-c, y-c)没有重复项,因为两个输入是不相交的(如果它们包含一个共同的元素,那应该是在xy的交集,这与x-c在交集中没有元素相矛盾)。 等。

唉,我还没有找到一种方法来概括超过 2 个输入,这让我感到惊讶。 它可以单独使用,也可以作为其他方法的构建块使用。 例如,如果有很多输入,则可以搜索具有大交集的对,并且此2输入方案用于直接执行整个产品的这些部分

即使只有 3 个输入,我也不清楚如何获得正确的结果

[1, 2], [2, 3], [1, 3]

完整的笛卡尔积有 2**3 = 8 个元素,其中只有一个重复:(1, 2, 3)出现两次(如(1, 2, 3)和再次以(2, 3, 1)出现)。 每对输入都有一个 1 元素交集,但所有 3 个元素的交集为空。

下面是一个实现:

def pair(x, y):
from itertools import product, chain
from itertools import combinations_with_replacement
x = set(x)
y = set(y)
c = x & y
chunks = []
if c:
x -= c
y -= c
chunks.append(combinations_with_replacement(c, 2))
if x:
chunks.append(product(x, c))
if y:
chunks.append(product(y, c))
if x and y:
chunks.append(product(x, y))
return chain.from_iterable(chunks)

基于最大匹配的概念验证

这融合了@Leon草图的想法和评论中勾勒@JosephWoods的方法。 它没有抛光,显然可以加快速度,但在我尝试过的所有情况下它都相当快。 因为它相当复杂,所以无论如何,以已经足够难以遵循的未优化形式发布它可能更有用!

这不会尝试确定"空闲"池的集合(如@Leon的草图所示)。 主要是因为我没有为此编写代码,部分原因是我不清楚如何有效地完成这项工作。 我确实有代码在二分图中查找匹配项,这只需要在此上下文中使用一些更改。

因此,这尝试按字典顺序排列合理的结果前缀,就像@JosephWood的草图一样,并且对于每个前缀,通过检查是否存在二分图匹配来查看是否真的可以构建。

因此,虽然@Leon草图的细节在这里基本上没有实现,但可见的行为大致相同:它按字典顺序生成结果,它不需要检查重复项,它是一个惰性生成器,峰值内存使用与池长度的总和成正比,它显然可以通过多种方式并行化(设置不同的进程以在结果空间的不同区域工作), 使其速度更快的关键在于减少图形匹配函数所做的大量冗余工作(它在每次调用上所做的大量工作只是复制了它在上一次调用中所做的工作)。

def matchgen(pools, ixs):
from collections import Counter
from collections import defaultdict
from itertools import chain, repeat, islice
elt2pools = defaultdict(set)
npools = 0
for i, count in Counter(ixs).items():
set_indices = set(range(npools, npools + count))
for elt in pools[i]:
elt2pools[elt] |= set_indices
npools += count
elt2count = {elt : len(ps) for elt, ps in elt2pools.items()}
cands = sorted(elt2pools.keys())
ncands = len(cands)
result = [None] * npools
# Is it possible to match result[:n] + [elt]*count?
# We already know it's possible to match result[:n], but
# this code doesn't exploit that.
def match(n, elt, count):
def extend(x, seen):
for y in elt2pools[x]:
if y not in seen:
seen.add(y)
if y in y2x:
if extend(y2x[y], seen):
y2x[y] = x
return True
else:
y2x[y] = x
return True
return False
y2x = {}
freexs = []
# A greedy pass first to grab easy matches.
for x in chain(islice(result, n), repeat(elt, count)):
for y in elt2pools[x]:
if y not in y2x:
y2x[y] = x
break
else:
freexs.append(x)
# Now do real work.
seen = set()
for x in freexs:
seen.clear()
if not extend(x, seen):
return False
return True
def inner(i, j): # fill result[j:] with elts from cands[i:]
if j >= npools:
yield tuple(result)
return
for i in range(i, ncands):
elt = cands[i]
# Find the most times `elt` can be added.
count = min(elt2count[elt], npools - j)
while count:
if match(j, elt, count):
break
count -= 1
# Since it can be added `count` times, it can also
# be added any number of times less than `count`.
for k in range(count):
result[j + k] = elt
while count:
yield from inner(i + 1, j + count)
count -= 1
return inner(0, 0)

编辑:请注意,这里有一个潜在的陷阱,由range(10_000)range(100_000)的一对池说明。 产生(9999, 99999)后,第一个位置增加到 10000,然后它继续很长时间,推断出 10001 中的任何一种可能性都不匹配。99999位居第二;然后对于 10001 在第一个位置与 10002 中的任何可能性都不匹配.99999位居第二;等等。 相反,@Leon的方案会注意到range(10_000)是唯一剩下的自由池,在第一个位置选择了 10000,然后立即注意到range(10_000)不包含任何大于 10000 的值。 它显然需要再次这样做 10001, 10002, ..., 99999 在第一个位置。 这是一种线性时间而不是二次时间的周期浪费,但同样是一种浪费。 故事的寓意:在你有实际的代码可以尝试之前,不要相信任何东西;-)

一个基于@Leon的方案

以下是对@Leon思想的忠实实施。 我比上面的"概念验证"(POC)代码更喜欢代码,但惊讶地发现新代码的运行速度明显慢(在类似于@JospephWood随机示例的各种情况下慢3到4倍)相对于POC代码的相对"优化"变体。

主要原因似乎是对匹配函数的更多调用。 POC 代码根据"合理"前缀调用一次。 新代码不会生成任何不可能的前缀,但对于它生成的每个前缀,可能需要进行多次match()调用,以确定剩余的可用池可能较小的集合。 也许有一个更聪明的方法可以做到这一点。

请注意,我添加了一个转折:如果到目前为止,空闲池的元素都小于前缀的最后一个元素,则相对于前缀,它仍然是"免费池",但它是无用的,因为它的任何元素都不能出现在候选中。 这对结果无关紧要,但这意味着池保留在所有剩余递归调用的可用池集中,这反过来意味着他们可以浪费时间确定它仍然是一个"空闲池"。 因此,当免费池不能再用于任何用途时,此版本会将其从免费池集中删除。 这大大加快了速度。

注意:有很多方法可以尝试匹配,其中一些方法具有更好的理论O()最坏情况行为。 根据我的经验,简单的深度优先(如这里)搜索在现实生活中的典型案例中运行得更快。 但这在很大程度上取决于手头应用程序中"典型"图形的特征。 我在这里没有尝试其他方法。

底线,忽略"2 输入"特例代码:

  • 如果您拥有 RAM,那么在速度方面,没有什么比增量重复数据消除更胜一筹了。 但没有什么比峰值内存负担更糟糕的了。

  • 没有什么比基于匹配的节俭记忆负担方法更好的了。 在这个尺度上,他们处于一个完全不同的宇宙中。 它们也是最慢的,尽管至少在同一个宇宙中;-)

代码:

def matchgen(pools, ixs):
from collections import Counter
from collections import defaultdict
from itertools import islice
elt2pools = defaultdict(list)
allpools = []
npools = 0
for i, count in Counter(ixs).items():
indices = list(range(npools, npools + count))
plist = sorted(pools[i])
for elt in plist:
elt2pools[elt].extend(indices)
for i in range(count):
allpools.append(plist)
npools += count
pools = allpools
assert npools == len(pools)
result = [None] * npools
# Is it possible to match result[:n] not using pool
# bady?  If not, return None.  Else return a matching,
# a dict whose keys are pool indices and whose values
# are a permutation of result[:n].
def match(n, bady):
def extend(x, seen):
for y in elt2pools[x]:
if y not in seen:
seen.add(y)
if y not in y2x or extend(y2x[y], seen):
y2x[y] = x
return True
return False
y2x = {}
freexs = []
# A greedy pass first to grab easy matches.
for x in islice(result, n):
for y in elt2pools[x]:
if y not in y2x and y != bady:
y2x[y] = x
break
else:
freexs.append(x)
# Now do real work.
for x in freexs:
if not extend(x, {bady}):
return None
return y2x
def inner(j, freepools): # fill result[j:]
from bisect import bisect_left
if j >= npools:
yield tuple(result)
return
if j:
new_freepools = set()
allcands = set()
exhausted = set()  # free pools with elts too small
atleast = result[j-1]
for pi in freepools:
if pi not in new_freepools:
m = match(j, pi)
if not m:  # match must use pi
continue
# Since `m` is a match to result[:j],
# any pool in freepools it does _not_
# use must still be free.
new_freepools |= freepools - m.keys()
assert pi in new_freepools
# pi is free with respect to result[:j].
pool = pools[pi]
if pool[-1] < atleast:
exhausted.add(pi)
else:
i = bisect_left(pool, atleast)
allcands.update(pool[i:])
if exhausted:
freepools -= exhausted
new_freepools -= exhausted
else: # j == 0
new_freepools = freepools
allcands = elt2pools.keys()
for result[j] in sorted(allcands):
yield from inner(j + 1, new_freepools)
return inner(0, set(range(npools)))

注意:这有自己的"坏情况"类。 例如,传递 128 份[0, 1]副本会在我的盒子上花费大约 2 分钟(!) 的时间才能找到 129 个结果。 POC 代码需要不到一秒钟的时间,而一些不匹配的方法似乎是即时的。

我不会详细说明原因。 可以说,因为所有的池都是一样的,所以无论递归有多深,它们仍然是"自由池"。match()从不遇到困难,总是在第一次(贪婪)传递中找到前缀的完全匹配,但即使这样也需要与当前前缀长度的平方成正比的时间(==当前递归深度)。

务实混合

这里还有一个。 如前所述,基于匹配的方法遭受了一些图匹配作为经常重复的基本操作的代价,并且很容易偶然遇到一些不幸的坏情况。

高度相似的池会导致可用池集缓慢收缩(或根本不收缩)。 但在这种情况下,池是如此相似,以至于元素从哪个池中获取很少重要。 因此,下面的方法不会尝试精确跟踪可用池,而是选择任意池,只要这些池明显可用,并且仅在卡住时才诉诸图形匹配。 这似乎效果很好。 举一个极端的例子,来自 128 个[0, 1]池的 129 个结果在不到十分之一秒的时间内交付,而不是在两分钟内交付。 事实证明,在这种情况下,它永远不需要进行图形匹配。

POC 代码的另一个问题(对于其他基于匹配的方法来说则不那么严重)是在交付最后一个结果后很长一段时间内旋转轮子的可能性。 一个务实的黑客完全解决了这个问题;-) 序列的最后一个元组很容易提前计算,代码会引发内部异常,以便在传递最后一个元组后立即结束所有内容。

对我来说就是这样! 对"两个输入"案例的概括对我来说仍然非常有趣,但我从其他方法中获得的所有痒现在已经被挠到了。

def combogen(pools, ixs):
from collections import Counter
from collections import defaultdict
from itertools import islice
elt2pools = defaultdict(set)
npools = 0
cands = []
MAXTUPLE = []
for i, count in Counter(ixs).items():
indices = set(range(npools, npools + count))
huge = None
for elt in pools[i]:
elt2pools[elt] |= indices
for i in range(count):
cands.append(elt)
if huge is None or elt > huge:
huge = elt
MAXTUPLE.extend([huge] * count)
npools += count
MAXTUPLE = tuple(sorted(MAXTUPLE))
cands.sort()
ncands = len(cands)
ALLPOOLS = set(range(npools))
availpools = ALLPOOLS.copy()
result = [None] * npools
class Finished(Exception):
pass
# Is it possible to match result[:n]? If not, return None.  Else
# return a matching, a dict whose keys are pool indices and
# whose values are a permutation of result[:n].
def match(n):
def extend(x, seen):
for y in elt2pools[x]:
if y not in seen:
seen.add(y)
if y not in y2x or extend(y2x[y], seen):
y2x[y] = x
return True
return False
y2x = {}
freexs = []
# A greedy pass first to grab easy matches.
for x in islice(result, n):
for y in elt2pools[x]:
if y not in y2x:
y2x[y] = x
break
else:
freexs.append(x)
# Now do real work.
seen = set()
for x in freexs:
seen.clear()
if not extend(x, seen):
return None
return y2x
def inner(i, j):  # fill result[j:] with cands[i:]
nonlocal availpools
if j >= npools:
r = tuple(result)
yield r
if r == MAXTUPLE:
raise Finished
return
restore_availpools = None
last = None
jp1 = j + 1
for i in range(i, ncands):
elt = cands[i]
if elt == last:
continue
last = result[j] = elt
pools = elt2pools[elt] & availpools
if pools:
pool = pools.pop() # pick one - arbitrary
availpools.remove(pool)
else:
# Find _a_ matching, and if that's possible fiddle
# availpools to pretend that's the one we used all
# along.
m = match(jp1)
if not m: # the prefix can't be extended with elt
continue
if restore_availpools is None:
restore_availpools = availpools.copy()
availpools = ALLPOOLS - m.keys()
# Find a pool from which elt was taken.
for pool, v in m.items():
if v == elt:
break
else:
assert False
yield from inner(i+1, jp1)
availpools.add(pool)
if restore_availpools is not None:
availpools = restore_availpools
try:
yield from inner(0, 0)
except Finished:
pass

这是一个难题。我认为在一般情况下,您最好的选择是实现一个hash table,其中键是multiset,值是您的实际组合。这与@ErikWolf提到的类似,但是此方法首先避免了产生重复项,因此不需要过滤。当我们遇到multisets时,它还返回正确的结果。

有一个更快的解决方案,我现在正在取笑,但要留待以后使用。请耐心等待。

如评论中所述,一种似乎可行的方法是合并所有池,并简单地生成此组合池的组合,选择池的数量。您需要一个能够生成多集组合的工具,我知道有一个工具可以在python中找到。它在sympy图书馆from sympy.utilities.iterables import multiset_combinations。这样做的问题是,我们仍然产生重复的值,更糟糕的是,我们产生的结果是用类似的setproduct组合无法获得的。例如,如果我们要执行诸如排序和组合 OP 中的所有池并应用以下内容的操作:

list(multiset_permutations([1,2,2,3,3,4,4,5]))

有几个结果是[1 2 2][4 4 5],这两者都不可能从[[1, 2, 3], [2, 3, 4], [3, 4, 5]]获得。

除了特殊情况之外,我不明白如何避免检查每个可能的产品。我希望我错了。

算法概述
主要思想是将我们的向量乘积的组合映射到唯一组合,而无需过滤掉重复项。OP给出的例子(即(1, 2, 3)(1, 3, 2)) 应该只映射到一个值(其中任何一个,因为顺序无关紧要)。我们注意到这两个向量是相同的集合。现在,我们也有这样的情况:

vec1 = (1, 2, 1)
vec2 = (2, 1, 1)
vec3 = (2, 2, 1)

我们需要vec1vec2映射到相同的值,而vec3需要映射到自己的值。这就是集合的问题,因为所有这些都是等价集合(对于集合,元素是唯一的,因此{a, b, b}{a, b}是等价的)。

这就是多集发挥作用的地方。对于多集,(2, 2, 1)(1, 2, 1)是不同的,但(1, 2, 1)(2, 1, 1)是相同的。这很好。我们现在有一种方法可以生成唯一键。

由于我不是python程序员,所以我将在C++继续。

如果我们尝试完全按原样实现上述所有内容,我们将遇到一些问题。据我所知,你不能把std::multiset<int>作为std::unordered_map的关键部分。但是,我们可以 定期std::map.它的性能不如下面的哈希表(它实际上是一棵红黑树),但它仍然提供了不错的性能。在这里:

void cartestionCombos(std::vector<std::vector<int> > v, bool verbose) {
std::map<std::multiset<int>, std::vector<int> > cartCombs;
unsigned long int len = v.size();
unsigned long int myProd = 1;
std::vector<unsigned long int> s(len);
for (std::size_t j = 0; j < len; ++j) {
myProd *= v[j].size();
s[j] = v[j].size() - 1;
}
unsigned long int loopLim = myProd - 1;
std::vector<std::vector<int> > res(myProd, std::vector<int>());
std::vector<unsigned long int> myCounter(len, 0);
std::vector<int> value(len, 0);
std::multiset<int> key;
for (std::size_t j = 0; j < loopLim; ++j) {
key.clear();
for (std::size_t k = 0; k < len; ++k) {
value[k] = v[k][myCounter[k]];
key.insert(value[k]);
}
cartCombs.insert({key, value});
int test = 0;
while (myCounter[test] == s[test]) {
myCounter[test] = 0;
++test;
}
++myCounter[test];
}
key.clear();
// Get last possible combination
for (std::size_t k = 0; k < len; ++k) {
value[k] = v[k][myCounter[k]];
key.insert(value[k]);
}
cartCombs.insert({key, value});
if (verbose) {
int count = 1;
for (std::pair<std::multiset<int>, std::vector<int> > element : cartCombs) {
std::string tempStr;
for (std::size_t k = 0; k < len; ++k)
tempStr += std::to_string(element.second[k]) + ' ';
std::cout << count << " : " << tempStr << std::endl;
++count;
}
}
}

使用 8 个长度从 4 到 8 的向量的测试用例,填充了从 1 到 15 的随机整数,上述算法在我的计算机上运行大约 5 秒。考虑到我们正在查看我们产品的近 250 万个总结果,这还不错,但我们可以做得更好。但是怎么做呢?

最佳性能由std::unordered_map提供,该密钥在恒定时间内内置。我们上面的键是在对数时间(多集、映射和哈希映射复杂性)中构建的。所以问题是,我们如何克服这些障碍?

最佳性能

我们知道我们必须放弃std::multiset。我们需要某种具有交换类型属性的对象,同时还要给出唯一的结果。

输入算术基本定理

它指出每个数都可以由素数的乘积唯一表示(直到因子的顺序)。这有时称为素分解。

所以现在,我们可以像以前一样简单地进行,但不是构建多集,而是将每个索引映射到一个素数并将结果相乘。这将为我们的密钥提供一个恒定的时间构造。下面是一个示例,显示了这种技术在我们上面创建的示例中的强大功能(注意。P下面是素数列表...(2, 3, 5, 7, 11, etc.)

Maps to                    Maps to            product
vec1 = (1, 2, 1)    -->>    P[1], P[2], P[1]   --->>   3, 5, 3    -->>    45
vec2 = (2, 1, 1)    -->>    P[2], P[1], P[1]   --->>   5, 3, 3    -->>    45
vec3 = (2, 2, 1)    -->>    P[2], P[2], P[1]   --->>   5, 5, 3    -->>    75

这太棒了!!vec1vec2映射到相同的数字,而vec3映射到不同的值,就像我们希望的那样。

void cartestionCombosPrimes(std::vector<std::vector<int> > v, 
std::vector<int> primes,
bool verbose) {
std::unordered_map<int64_t, std::vector<int> > cartCombs;
unsigned long int len = v.size();
unsigned long int myProd = 1;
std::vector<unsigned long int> s(len);
for (std::size_t j = 0; j < len; ++j) {
myProd *= v[j].size();
s[j] = v[j].size() - 1;
}
unsigned long int loopLim = myProd - 1;
std::vector<std::vector<int> > res(myProd, std::vector<int>());
std::vector<unsigned long int> myCounter(len, 0);
std::vector<int> value(len, 0);
int64_t key;
for (std::size_t j = 0; j < loopLim; ++j) {
key = 1;
for (std::size_t k = 0; k < len; ++k) {
value[k] = v[k][myCounter[k]];
key *= primes[value[k]];
}
cartCombs.insert({key, value});
int test = 0;
while (myCounter[test] == s[test]) {
myCounter[test] = 0;
++test;
}
++myCounter[test];
}
key = 1;
// Get last possible combination
for (std::size_t k = 0; k < len; ++k) {
value[k] = v[k][myCounter[k]];
key *= primes[value[k]];
}
cartCombs.insert({key, value});
std::cout << cartCombs.size() << std::endl;
if (verbose) {
int count = 1;
for (std::pair<int, std::vector<int> > element : cartCombs) {
std::string tempStr;
for (std::size_t k = 0; k < len; ++k)
tempStr += std::to_string(element.second[k]) + ' ';
std::cout << count << " : " << tempStr << std::endl;
++count;
}
}
}

在上面将生成近 250 万个产品的同一示例中,上述算法在不到 0.3 秒的时间内返回相同的结果。

后一种方法有几个注意事项。我们必须先地生成素数,如果我们的笛卡尔积中有许多向量,则密钥可能会超出int64_t的范围。第一个问题应该不难克服,因为有许多资源(库、查找表等)可用于生成素数。我不太确定,但我读到后一个问题对python来说应该不是问题,因为整数具有任意精度(Python 整数范围)。

我们还必须处理这样一个事实,即我们的源向量可能不是具有小值的整数向量。这可以通过在继续之前对所有向量中的所有元素进行排名来补救。例如,给定以下向量:

vec1 = (12345.65, 5, 5432.11111)
vec2 = (2222.22, 0.000005, 5)
vec3 = (5, 0.5, 0.8)

对它们进行排名,我们将得到:

rank1 = (6, 3, 5)
rank2 = (4, 0, 3)
rank3 = (3, 1, 2)

现在,这些可以用来代替实际值来创建密钥。代码中唯一会更改的部分是构建密钥的 for 循环(当然还有需要创建的rank对象):

for (std::size_t k = 0; k < len; ++k) {
value[k] = v[k][myCounter[k]];
key *= primes[rank[k][myCounter[k]]];
}

编辑:
正如一些评论者指出的那样,上述方法掩盖了必须生成所有产品的事实。我应该在第一次说。就个人而言,鉴于许多不同的演示文稿,我不明白如何避免它。

另外,如果有人好奇,这是我上面使用的测试用例:

[1 10 14  6],
[7  2  4  8  3 11 12],
[11  3 13  4 15  8  6  5],
[10  1  3  2  9  5  7],
[1  5 10  3  8 14],
[15  3  7 10  4  5  8  6],
[14  9 11 15],
[7  6 13 14 10 11  9  4]

它应该返回162295独特的组合。

节省一些工作的一种方法可能是生成前 k 个选定池的重复数据删除组合,然后将其扩展到第一个 k+1 的重复数据删除组合。这样可以避免单独生成和拒绝所有从前两个池中选取2, 1而不是1, 2的 length-20 组合:

def combinations_from_pools(pools):
# 1-element set whose one element is an empty tuple.
# With no built-in hashable multiset type, sorted tuples are probably the most efficient
# multiset representation.
combos = {()}
for pool in pools:
combos = {tuple(sorted(combo + (elem,))) for combo in combos for elem in pool}
return combos

但是,对于您所说的输入大小,无论您生成组合的效率如何,您都永远无法处理所有组合。即使有 20 个相同的 1000 元素池,也会有496432432489450355564471512635900731810050组合(1019 选择20,通过星条公式),或大约 5e41。如果你征服了地球,并将全人类所有计算设备的所有处理能力都投入到这项任务中,你仍然无法在这方面有所作为。您需要找到一种更好的方法来解决您的基本任务。

到目前为止发布的答案(包括 Tim Peters 的懒惰词典一次生成)具有与输出大小成正比的最坏情况空间复杂性。我将概述一种方法,该方法将建设性地生成所有唯一的无序组合,而无需对内部生成的中间数据进行重复数据删除。我的算法按字典排序顺序生成组合。与更简单的算法相比,它具有计算开销。但是,它可以并行化(以便可以同时生成最终输出的不同范围)。

这个想法如下。

所以我们有N池{P1, ..., PN},我们必须从中抽取我们的组合。我们可以很容易地识别最小的组合(关于提到的词典顺序)。顺其自然 (x 1, x 2 ..., xN-1, xN) (其中x 1<= x2<<sub>= ...<= xN-1<= xN,每个 xj只是其中一个池 {Pi} 中的最小元素)。这个最小的组合后面将跟着零个或多个组合,其中前缀 x 1、x2...、xN-1相同,最后一个位置在递增的值序列上运行。我们如何识别该序列?

让我们介绍以下定义:

给定一个组合前缀 C=(x 1, x2..., xK-1, x K)(其中K<N),如果池>C 称为自由池 Pi

识别给定前缀的可用池很容易简化为在二分图中查找最大匹配的问题。具有挑战性的部分是有效地做到这一点(利用我们案例的具体情况)。但是我会保存它以备后用(这是正在进行的工作,将在一天内实现为Python程序)。

因此,对于第一个组合的前缀 (x 1, x2..., xN-1),我们可以识别所有可用池 {FPi}。它们中的任何一个都可用于为最后一个位置选取元素。因此,感兴趣的序列是 {FP1UFP 2U ...} 中大于或等于 xN-1的元素的排序集。

当最后一个仓位用尽时,我们必须增加最后一个仓位,然后我们将重复查找最后一个仓位的可能值的过程。令人惊讶的是,枚举最后一个但一个(以及任何其他)位置的值的过程是相同的 - 唯一的区别是组合前缀的长度,必须根据该前缀来标识可用池。

因此,以下递归算法应该完成这项工作:

  1. 从空的组合前缀 C 开始。此时,所有游泳池都是免费的。
  2. 如果 C 的长度等于 N,则输出 C 并返回。
  3. 可用池合并到一个排序列表 S 中,并从中删除小于 C 最后一个元素的所有元素。
  4. 对于 S 中的每个值 x 做
    • 新的组合前缀是 C'=(C, x)
    • 由于当前组合前缀增加了 1,因此某些可用池不再可用。识别它们并使用更新的可用池列表和组合前缀 C' 递归到步骤 1。

你可以实现一个可哈希列表并使用python set()来过滤所有重复项。 您的哈希函数只需要忽略列表中的顺序,这可以通过使用集合来实现。计数器

from collections import Counter
class HashableList(list):
def __hash__(self):
return hash(frozenset(Counter(self)))
def __eq__(self, other):
return hash(self) == hash(other)
x = HashableList([1,2,3])
y = HashableList([3,2,1])
print set([x,y])

这将返回:

set([[1, 2, 3]])

这是我想到的:

class Combination:
def __init__(self, combination):
self.combination = tuple(sorted(combination))
def __eq__(self, other):
return self.combination == self.combination
def __hash__(self):
return self.combination.__hash__()
def __repr__(self):
return self.combination.__repr__()
def __getitem__(self, i):
return self.combination[i]

然后

pools = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
part = (0, 0, 1)
set(Combination(combin) for combin in product(*(pools[i] for i in part)))

输出:

{(1, 1, 2),
(1, 1, 3),
(1, 1, 4),
(1, 2, 2),
(1, 2, 3),
(1, 2, 4),
(1, 3, 3),
(1, 3, 4),
(2, 2, 2),
(2, 2, 3),
(2, 2, 4),
(2, 3, 3),
(2, 3, 4),
(3, 3, 3),
(3, 3, 4)}

不确定这是否真的是您要找的。

最新更新