分而治之
算法-- 将函数和列表作为输入。 返回函数(列表)
这个位很简单,它变得更酷,因为它使用多处理模块来拆分列表,然后以不同的位处理它并返回一个列表。 (这是下面的.py文件的全部内容,只需将所有代码块复制到python3的.py文件中,您应该会看到问题。
得到了我的进口
import multiprocessing as multi
import numpy as np
import pickle
import os
一种记录事物的方法(这似乎不想在此过程中起作用)
def log(text):
text = str(text)
with open(text, 'w') as file:
file.write('Nothing')
函数包装器
此函数的目标是获取一个函数,并通过从磁盘中提取数据来处理为其提供数据。主要是因为 Pipes 最终会出现一个我找不到解决方案的错误。
def __wrap(function):
filename = multi.current_process().name + '.tmp'
with open(filename, 'rb') as file:
item_list = pickle.load(file)
result = function(item_list)
with open(filename, 'wb') as file:
pickle.dump(result, file)
肉和土豆
这会将列表划分为较小的列表,供每个 CPU 吞噬,然后为每个块启动小进程。它将输入数据保存到磁盘上,以便 __wrap() 函数拉起。最后,它提取已写入磁盘的结果 bt __wrap() 函数,将它们连接成单个列表并返回值。
def divide_and_conquer(f, things):
cpu_count = multi.cpu_count()
chunks = np.array_split(things ,cpu_count )
cpus = []
for cpu in range(cpu_count):
filename = '{}.tmp'.format(cpu)
with open(filename, 'wb') as file:
pickle.dump(chunks[cpu], file)
p = multi.Process(name = str(cpu), target = __wrap, args = (f,))
p.start()
cpus.append(p)
for cpu in cpus:
cpu.join()
done = []
for cpu in cpus:
filename = '{}.tmp'.format(cpu.name)
with open(filename, 'rb') as file:
data = pickle.load(file)
os.remove(filename)
done.append(data)
try:
done = np.concatenate(done)
except ValueError:
pass
return done
测试样品
to_do = list(range(10))
def func(thins):
for thin in thins:
thin
return [0, 1, 2,3]
divide_and_conquer(func, to_do)
这只是没有预期的输出,它只是出于某种原因输出输入。
最终我的目标是加快长时间运行的计算速度。我经常发现自己在处理列表时,每个项目都需要几秒钟才能解析。(网页抓取等)我几乎只想将此工具添加到我的小"常用代码片段库"中,这样我就可以导入并继续
"rt.divide_and_conquer(tough_function,really_long_list)">
并看到简单的 8 倍改进。 我目前在Windows上看到这个问题(还没有在我的linux盒子上测试它),我的阅读表明显然Linux和Windows处理多处理的方式不同。
你不需要重新发明轮子。如果我理解您要正确实现的目标,那么 concurrent.futures 模块就是您所需要的。
ProcessPoolExecutor 执行拆分列表、启动多个进程(使用默认设置的最大可用线程数)并将函数应用于这些列表中的每个元素的工作。