对循环进行多重处理



我有一个脚本,它在Panda数据帧上循环,并根据一些搜索和几何操作将GIS数据输出到地质包。当我使用for循环时,它是有效的,但对于超过4k的记录,它需要一段时间。由于我已经将它构建为它自己的函数,该函数根据行迭代返回我需要的内容,所以我尝试使用进行多处理来运行它

import pandas as pd, bwe_mapping
from multiprocessing import Pool

#Sample dataframe
bwes = [['id', 7216],['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92], ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwedf = pd.read_csv(bwes)
geopackage = "datalocationgeopackage.gpkg"
tracklayer = "tracks"
if __name__=='__main__':
def task(item):
bwe_mapping.map_bwe(item, geopackage, tracklayer)
pool = Pool()
for index, row in bwedf.iterrows():
task(row)
with Pool() as pool:
for results in pool.imap_unordered(task, bwedf.iterrows()):
print(results)

当我运行这个时,我的任务管理器会填充16个新的python任务,但没有任何迹象表明正在执行任何任务。使用numpy.array.split((将我的pandas-df分解为4或8个更小的,并在bwedf.iterrows((中为每个数据帧在自己的处理器上运行for index行,这会更好吗?没有一个过程需要按照任何顺序进行;只要我可以将输出存储到一个列表中,最后连接到地质包层中,这些输出就是地质包数据帧。我应该在函数中放入for循环,然后将整个数据帧和gis数据传递给它进行搜索吗?

如果您在windows/macOS上运行,那么它将使用spawn来创建工作线程,这意味着任何子都必须在导入主脚本时找到要执行的函数。

你的代码在if __name__=='__main__':中有函数定义,所以孩子们不能访问它。

简单地将函数CCD_ 3移动到CCD_。

现在发生的情况是,当它试图运行一个函数时,每个孩子都崩溃了,因为它从来没有看到它的定义。

再现问题的最小代码:

from multiprocessing import Pool
if __name__ == '__main__':
def task(item):
print(item)
return item
pool = Pool()
with Pool() as pool:
for results in pool.imap_unordered(task, range(10)):
print(results)

并且解决方案是将函数定义移动到CCD_ 5行之前。

编辑:现在要迭代数据帧中的行,这个简单的例子演示了如何做到这一点,请注意,iterrows返回一个索引和一行,这就是它被解包的原因。

import os
import pandas as pd
from multiprocessing import Pool
import time
# Sample dataframe
bwes = [['id', 7216], ['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92],
['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwef = pd.DataFrame(bwes)
def task(item):
time.sleep(1)
index, row = item
# print(os.getpid(), tuple(row))
return str(os.getpid()) + " " + str(tuple(row))

if __name__ == '__main__':
with Pool() as pool:
for results in pool.imap_unordered(task, bwef.iterrows()):
print(results)

time.sleep(1)之所以存在,是因为只有少量的工作,一个工人可能会抢光所有的工作,所以我强迫每个工人等待其他工人,你应该删除它,结果如下:

13228 ('id', 7216)
11376 ('item_id', 3277841)
15580 ('Date', '2019-01-04T00:00:00.000Z')
10712 ('start_lat', -56.92)
11376 ('End_lat', -59.87)
13228 ('start_lon', 45.87)
10712 ('End_lon', 44.67)

它看起来像是你的";示例";数据帧是转置的,但您只需要正确地构造数据帧,我建议您首先使用iterrows串行运行代码,然后在多个核心上运行它。

很明显,将数据发送给工作者和从他们那里返回需要时间,所以要确保每个工作者都在做大量的计算工作,而不仅仅是将数据发送回父进程。

最新更新