这是我的代码:
import pandas as pd
import multiprocessing as mp
CPU = 4
inp = pd.DataFrame({ 'col': ['a', 'b'] })
def test(dataframe):
df = dataframe.copy()
def worker(data):
print('worker')
def callback(data):
print('callback')
pool = mp.Pool(CPU)
for idx, row in df.iterrows():
print((idx, row['col']))
pool.apply_async(worker, args=[(idx, row['col'])], callback=callback)
pool.close()
pool.join()
return df
test(inp)
如果我在上部范围内运行(没有包含在test
函数中(,但它按预期工作,但在将其包含在另一个函数中后 - 它们只是没有被调用。
这是我通过test
函数收到的输出:
(0, 'a')
(1, 'b')
没有:
(0, 'a')
(1, 'b')
worker
worker
callback
callback
所以问题是 -如何让它在另一个函数中工作?
来自multiprocessing
模块文档:
安全导入主模块
确保主模块可以由新的 Python 解释器安全地导入,而不会引起意外的副作用(例如 开始一个新过程(。
注意此包中的功能要求
__main__
模块可由子模块导入。这在 编程指南,但值得在这里指出。
正确的方法之一如下:
import pandas as pd
import multiprocessing as mp
CPU = 4
inp = pd.DataFrame({'col': ['a', 'b']})
def worker(data):
print(data)
print('worker')
def callback(data):
print('callback')
def test(dataframe):
df = dataframe.copy()
with mp.Pool(CPU) as pool:
for idx, row in df.iterrows():
result = pool.apply_async(worker, args=[(idx, row['col'])], callback=callback)
result.wait()
return df
if __name__ == '__main__':
test(inp)
输出:
(0, 'a')
worker
(1, 'b')
worker
callback
callback