我正在尝试一次读取文件的多行,以便将这些行分成两个单独的列表。cleanLine 函数实质上是接收它被馈送的行并清理它,返回一行没有空格。现在我的代码编译并返回与没有多处理相同的结果,但是,脚本的整体运行时没有改善,所以我不确定它是否实际上一次生成多个进程,或者它是否只是一次做一个。在这种特定情况下,我不确定如何判断它实际上是创建多个进程还是仅创建一个进程。脚本的这一部分运行速度不快,还是我做得不正确?任何帮助或反馈将不胜感激。
代码片段:
import multiprocessing
from multiprocessing import Pool
filediff = open("sample.txt", "r", encoding ="latin-1")
filediffline = filediff.readlines()
pos = []
neg = []
cpuCores = multiprocessing.cpu_count() - 1
pool = Pool(processes = cpuCores)
for line in filediffline:
result = pool.apply_async(cleanLine, [line]).get()
if line.startswith("+"):
pos.append(result)
elif line.startswith("-"):
neg.append(result)
pool.close()
pool.join()
如前所述,result = pool.apply_async(cleanLine, [line]).get()
向子进程发送一行并等待它返回。这比仅在父进程中进行处理要慢。即使你返工了那个位,也不太可能有任何速度,除非预处理是CPU密集型的。
另一种方法是构建管道,方法是将预处理放入单独的文件中并使用subprocess.Popen
执行它或使用multiprocessing.Pipe
.使用此方法,文件读取和行处理都在单独的进程中完成。
这样做的好处是文件读取+预处理与主进程的工作重叠。但是,如果与序列化对象以将其从一个进程获取到另一个进程的成本相比,该预处理微不足道,则不会看到任何加速。
import multiprocessing as mp
pos = []
neg = []
def line_cleaner(line):
return line.strip()
def cleaner(filename, encoding, pipe):
try:
with open(filename, encoding=encoding) as fp:
for line in fp:
line = line_cleaner(line)
if line:
pipe.send(line)
finally:
pipe.close()
if __name__ == "__main__":
receiver, sender = mp.Pipe(duplex=False)
process = mp.Process(target=cleaner,
args=("sample.txt", "latin-1", sender))
process.start()
sender.close() # so child holds only reference
try:
while True:
line = receiver.recv()
if line.startswith("+"):
pos.append(line)
elif line.startswith("-"):
neg.append(line)
except EOFError:
pass # child exit
finally:
process.join()
print(pos, neg)
使用apply_async().get()
等效于阻止调用apply()
。对于异步处理,请尝试利用带有回调参数的apply_async
来处理结果。请记住,回调是在单独的线程中调用的。
您正在使用IO
. 我不确定您的处理是CPU-bound
还是IO-Bound
操作/过程。如前所述,如果您读取整行list
,这意味着您读取的所有IO
都位于 RAM 中(在这种情况下考虑使用 file.read((!如果你的数据或文件太大,这会产生副作用(,并且所有在列表中完成的数据处理,那么你会看到性能有所提升(取决于列表大小(, 只有在这种情况下,在RAM上有足够大的列表时,我建议使用concurent.futures
模块,见下文:
import concurrent.futures
def process_line(line):
return line.strip()
def execute(filename):
lines = []
with open(filename, encoding=encoding) as fp:
lines = fp.read()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(process_line(line)) for line in lines]