我是一个多处理的新手,我正在努力加快我的旧算法。它工作得很好,没有多处理,但在我尝试实现它的那一刻,程序停止了工作:它一直待命,直到我中止脚本。另一个问题是它没有填充数据帧:同样,它正常工作,但对于多处理,它只返回NaN。
func运行良好。
stockUniverse = list(map(lambda s: s.strip(), Stocks)) #Stocks = list
def func(i):
df.at[i, 'A'] = 1
df.at[i, 'B'] = 2
df.at[i, 'C'] = 3
print(i, 'downloaded')
return True
if __name__ == "__main__":
print('Start')
pool = mp.Pool(mp.cpu_count())
pool.imap(func, stockUniverse)
print(df)
结果是:
Index 19 NaN NaN NaN
index 20 NaN NaN NaN
然后它停在那里,直到我击中Ctrl+C
。感谢
map
函数阻塞,直到所有提交的任务都完成,并从辅助函数返回返回值列表。但是imap
函数会立即返回,并且带有迭代器,当每个值都可用时,必须迭代该迭代器才能逐个返回。您的原始代码没有迭代迭代器,而是立即打印出它所期望的更新的df
。但是,您可能没有给任务足够的时间来启动和完成df
进行修改。理论上,如果您在print
语句之前插入了对time.sleep
的调用足够长的时间,那么在打印出df
之前,任务就已经开始并完成了。但显然,迭代迭代器是确保所有任务都已完成的最有效方法,也是返回返回值的唯一方法。
但是,正如我在评论中提到的,你有一个更大的问题。您提交的任务由您创建的进程池中的进程调用的辅助函数func
执行,每个进程都在自己的地址空间中执行。您没有用正在运行的平台标记您的问题(无论何时用multiprocessing
标记问题,都应该用平台标记问题(,但我可能推断您正在使用spawn
方法创建新进程的平台下运行,例如Windows,这就是为什么您有if __name__ == "__main__":
块控制代码来创建新的进程(即处理池(。当spawn
用于创建新进程时,会创建一个新的空地址空间,启动一个新Python解释器,并从顶部重新执行源代码(如果没有创建新进程的if __name__ == "__main__":
块控制代码,您将进入创建新进程中的无限递归循环(。但这意味着,在if __name__ == "__main__":
块(如果在Windows下运行,则必须省略该块(之外的全局范围内对df
的任何定义,都将在创建每个进程时为池中的每个进程创建一个新的独立实例。
如果您改为在Linux
下运行,其中fork
用于创建新流程,则情况会有所不同。新进程将从主进程和所有声明的变量继承原始地址空间,但使用写入时复制。这意味着,一旦子流程尝试修改此继承存储中的任何变量,就会生成页面的副本,并且该流程现在将使用自己的副本。因此,不能为了更新目的而共享任何内容。
因此,您应该修改您的程序,使您的辅助函数将值返回到主进程,主进程将进行必要的更新:
import multiprocessing as mp
import pandas as pd
def func(stock):
return (stock, (('A', 1), ('B', 1), ('C', 1)))
if __name__ == "__main__":
stockUniverse = ['abc', 'def', 'ghi', 'klm']
d = {col: pd.Series(index=stockUniverse, dtype='int32') for col in ['A', 'B', 'C']}
df = pd.DataFrame(d)
pool_size = min(mp.cpu_count(), len(stockUniverse))
pool = mp.Pool(pool_size)
for result in pool.imap_unordered(func, stockUniverse):
stock, col_values = result # unpack
for col_value in col_values:
col, value = col_value # unpack
df.at[stock, col] = value
print(df)
打印:
A B C
abc 1 1 1
def 1 1 1
ghi 1 1 1
klm 1 1 1
请注意,我使用了imap_unordered
而不是imap
。前一种方法允许以任意顺序返回结果(即,当结果可用时(,并且通常更高效,并且由于返回值包含设置df
的正确行所需的所有信息,因此我们不再需要任何特定的排序。
但是:
如果你的worker函数基本上什么都不做,只做从网站下载和很少的CPU密集型处理,那么你可以(应该(通过简单地替换来使用线程池
from multiprocessing.pool import ThreadPool
...
MAX_THREADS_TO_USE = 100 # or maybe even larger!!!
pool_size = min(MAX_THREADS_TO_USE, len(stockUniverse))
pool = ThreadPool(pool_size)
由于所有线程共享相同的地址空间,因此可以按原样使用原始的辅助函数func