Python多处理卡住了(也许正在阅读CSV)



我正在尝试学习如何使用multiprocessing,但我遇到了问题。

我正在尝试运行此代码:

import multiprocessing as mp
import random
import string
random.seed(123)
# Define an output queue
output = mp.Queue()
# define a example function
def rand_string(length, output):
""" Generates a random string of numbers, lower- and uppercase chars. """
rand_str = ''.join(random.choice(
string.ascii_lowercase
+ string.ascii_uppercase
+ string.digits)
for i in range(length))
output.put(rand_str)
# Setup a list of processes that we want to run
processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(4)]
# Run processes
for p in processes:
p.start()
# Exit the completed processes
for p in processes:
p.join()
# Get process results from the output queue
results = [output.get() for p in processes]
print(results)

从这里

代码本身运行正常,但是当我用我的函数替换rand_string(读取 Pandas 数据帧中的一堆 csv 文件(时,代码永远不会结束。

函数是这样的:

def readMyCSV(clFile):
aClTable = pd.read_csv(clFile)
# I do some processing here, but at the end the 
# function returns a Pandas DataFrame
return(aClTable)

然后我包装函数,以便它允许在参数中Queue

def readMyCSVParWrap(clFile, outputq):
outputq.put(readMyCSV(clFile))

我通过以下方式构建流程:

processes = [mp.Process(target=readMyCSVParWrap, args=(singleFile,output)) for singleFile in allFiles[:5]]

如果我这样做,代码永远不会停止运行,结果永远不会打印。

如果我只将 clFile 字符串放在输出队列中,例如:

outputq.put((clFile))

结果打印正确(只是 clFiles 的列表(

当我查看htop时,我看到生成了 5 个进程,但它们不使用任何 CPU。

最后,如果我自己运行readMyCSV函数,它可以正常工作(返回一个Pandas数据帧(

我做错了什么吗? 我正在Jupyter笔记本中运行它,也许这是一个问题?

似乎您对进程的join语句导致死锁。进程无法终止,因为它们会等到队列中的项被使用,但在代码中,这仅在联接后发生。

加入使用队列的进程

请记住,将项目放入队列的进程将在终止之前等待,直到"馈送器"线程将所有缓冲项目馈送到底层管道。(子进程可以调用队列的 Queue.cancel_join_thread 方法来避免此行为。

这意味着,无论何时使用队列,都需要确保在加入流程之前,最终将删除队列中的所有项目。否则,您无法确定已将项目放入队列的进程将终止。另请记住,非守护进程将自动加入。 文档

文档进一步建议将行与queue.getjoin交换,或者只是删除join

同样重要的是:

确保主模块可以由新的 Python 解释器安全地导入,而不会引起意外的副作用(例如启动新进程(......通过使用 if名称== 'main': 来保护程序的"入口点"。同前

相关内容

  • 没有找到相关文章

最新更新