如何使用多处理将重复项放在一个非常大的列表中?



假设我有一个包含随机数的庞大列表,例如

L = [random.randrange(0,25000000000) for _ in range(1000000000)]

我需要删除此列表中的重复项

我为包含较少元素的列表编写了此代码

def remove_duplicates(list_to_deduplicate):
seen = set()
result=[]
for i in list_to_deduplicate:
if i not in seen:
result.append(i)
seen.add(i)
return result

在上面的代码中,我创建了一个集合,以便我可以记住我正在处理的列表中已经出现了哪些数字,如果该数字不在集合中,那么我将其添加到结果列表中,我需要返回并将其保存在集合中,这样它就不会再次添加到结果列表中

现在对于列表中的 1000000 个数字,一切都很好,我可以快速得到结果,但对于优于 1000000000 个问题的数字,我需要使用机器上的不同内核来尝试分解问题,然后合并来自多个进程的结果

我的第一个猜测是让所有进程都可以访问一组,但会出现许多复杂性 当另一个进程添加到集合中时,如何读取,我什至不知道是否可以在进程之间共享一个集合我知道我们可以使用队列或管道,但我不确定如何使用它

有人可以给我一个关于解决这个问题的最佳方法的建议吗 我对任何新想法都持开放态度

我怀疑,即使是你最伟大的列表也足够大,以至于多处理会改善时序。使用 numpy 和多线程可能是您最好的机会。

多处理引入了相当多的开销并增加了内存消耗@Frank就像前面正确提到的 Merrow 一样。 但是,对于多线程来说,情况并非如此(扩展到那个扩展(。不要混淆这些术语很重要,因为进程和线程是不一样的。 同一进程中的线程共享其内存,不同的进程不共享。

在Python中使用多核的问题是GIL,它不允许多个线程(在同一进程中(并行执行Python字节码。一些像numpy这样的C扩展可以释放GIL,这使得从多线程的多核并行性中受益。这是您通过使用numpy在重大改进之上获得一些速度的机会。

from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
n_threads = 8
result = np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()

使用 numpy 和线程池,拆分数组,使子数组在单独的线程中唯一,然后连接子数组并使重新组合的数组再次唯一。 重新组合数组的最终删除重复项是必要的,因为在子数组中只能识别本地重复项。

对于低熵数据(许多重复(,使用pandas.unique而不是numpy.unique可以更快。与numpy.unique不同,它还保留了外观顺序。

请注意,使用像上面这样的线程池只有在 numpy-function 还没有通过调用低级数学库在后台进行多线程时才有意义。因此,请始终进行测试以查看它是否真的提高了性能,并且不要将其视为理所当然。


使用范围内的 100M 随机生成整数进行测试:

高熵:0 - 25_000_000
  • _000(199560 重复(
  • 低熵:0 - 1000

法典

import time
import timeit
from multiprocessing.dummy import Pool  # .dummy uses threads
import numpy as np
import pandas as pd

def time_stmt(stmt, title=None):
t = timeit.repeat(
stmt=stmt,
timer=time.perf_counter_ns, repeat=3, number=1, globals=globals()
)
print(f"t{title or stmt}")
print(f"tt{min(t) / 1e9:.2f} s")

if __name__ == '__main__':
n_threads = 8  # machine with 8 cores (4 physical cores)
stmt_np_unique_pool = 
"""
np.unique(np.concatenate(
Pool(n_threads).map(np.unique, np.array_split(r, n_threads)))
).tolist()
"""
stmt_pd_unique_pool = 
"""
pd.unique(np.concatenate(
Pool(n_threads).map(pd.unique, np.array_split(r, n_threads)))
).tolist()
"""
# -------------------------------------------------------------------------
print(f"nhigh entropy (few duplicates) {'-' * 30}n")
r = np.random.RandomState(42).randint(0, 25000000000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")    
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")
# ---
print(f"nlow entropy (many duplicates) {'-' * 30}n")
r = np.random.RandomState(42).randint(0, 1000, 100_000_000)
r = list(r)
time_stmt("list(set(r))")
r = np.asarray(r)
# numpy.unique
time_stmt("np.unique(r).tolist()")
# pandas.unique
time_stmt("pd.unique(r).tolist()")
# numpy.unique & Pool
time_stmt(stmt_np_unique_pool, "numpy.unique() & Pool")
# pandas.unique() & Pool
time_stmt(stmt_pd_unique_pool, "pandas.unique() & Pool")

就像你在下面的时间中看到的那样,仅仅使用 numpy 而不使用多线程已经占了最大的性能改进。另请注意pandas.unique()对于许多重复项,比numpy.unique()(仅(更快。

high entropy (few duplicates) ------------------------------
list(set(r))
32.76 s
np.unique(r).tolist()
12.32 s
pd.unique(r).tolist()
23.01 s
numpy.unique() & Pool
9.75 s
pandas.unique() & Pool
28.91 s
low entropy (many duplicates) ------------------------------
list(set(r))
5.66 s
np.unique(r).tolist()
4.59 s
pd.unique(r).tolist()
0.75 s
numpy.unique() & Pool
1.17 s
pandas.unique() & Pool
0.19 s

不能说我喜欢这个,但它应该在时尚之后起作用。

将数据分成N个只读部分。为每个工作人员分配一个以研究数据。 所有内容都是只读的,因此可以共享所有内容。每个工作线程i 1...N 根据所有其他"未来"列表i+1...N检查其列表

每个工作线程都为其i+1...N列表维护一个位表,记录其任何项目是否命中任何未来项目。

当每个人都完成后,工人i将其位表发送回主,在那里可以进行ANDed。 然后删除零。没有排序没有集合。 检查不快,呵呵。

如果你不想打扰多个位表,你可以让每个工人自己的职责区域上方找到一个重复值时写零。 但是,现在您遇到了真正的共享内存问题。 就此而言,您甚至可以让每个作品删除其区域上方的 dup,但同上。

即使将工作分开也回避了这个问题。对于每个工人来说,在其他人的列表中为每个自己的条目走路是很昂贵的。*(N-1(镜头(区域(/2. 每个工作人员都可以创建一组区域,或对其区域进行排序。 两者都允许更快的检查,但成本会增加。

相关内容

  • 没有找到相关文章

最新更新