我有一个函数,我正在尝试并行应用,在该函数中,我调用了另一个函数,我认为该函数将从并行执行中受益。目标是获取每个田地多年作物产量,并将其全部合并到一个大熊猫数据框中。我有一个用于查找每个数据帧中最近点的功能,但它非常密集并且需要一些时间。我想加快速度。
我尝试创建一个池并在内部函数上使用map_async。我也尝试对外部函数的循环做同样的事情。后者是我唯一能按照我想要的方式工作的东西。我可以使用它,但我知道必须有一种方法可以使其更快。查看下面的代码:
return_columns = []
return_columns_cb = lambda x: return_columns.append(x)
def getnearestpoint(gdA, gdB, retcol):
dist = lambda point1, point2: distance.great_circle(point1, point2).feet
def find_closest(point):
distances = gdB.apply(
lambda row: dist(point, (row["Longitude"], row["Latitude"])), axis=1
)
return (gdB.loc[distances.idxmin(), retcol], distances.min())
append_retcol = gdA.apply(
lambda row: find_closest((row["Longitude"], row["Latitude"])), axis=1
)
return append_retcol
def combine_yield(field):
#field is a list of the files for the field I'm working with
#lots of pre-processing
#dfs in this case is a list of the dataframes for the current field
#mdf is the dataframe with the most points which I poppped from this list
p = Pool()
for i in range(0, len(dfs)):
p.apply_async(getnearestpoint, args=(mdf, dfs[i], dfs[i].columns[-1]), callback=return_cols_cb)
for col in return_columns:
mdf = mdf.append(col)
'''I unzip my points back to longitude and latitude here in the final
dataframe so I can write to csv without tuples'''
mdf[["Longitude", "Latitude"]] = pd.DataFrame(
mdf["Point"].tolist(), index=mdf.index
)
return mdf
def multiprocess_combine_yield():
'''do stuff to get dictionary below with each field name as key and values
as all the files for that field'''
yield_by_field = {'C01': ('files...'), ...}
#The farm I'm working on has 30 fields and below is too slow
for k,v in yield_by_field.items():
combine_yield(v)
我想我需要帮助的是我设想使用池来 imap 或apply_async字典中的每个文件元组。然后在应用于该文件元组的 combine_yield 函数中,我希望能够并行处理距离函数。该函数使程序陷入困境,因为它计算每个数据帧中每个点之间的距离,以获得每年的产量。这些文件平均大约 1200 个数据点,然后您将所有这些乘以 30 个字段,我需要更好的东西。也许效率的提高在于找到一种更好的方法来拉入最近的点。我仍然需要一些东西来为我提供 gdB 的值和距离,因为我稍后在从"mdf"数据帧中选择要使用的行时会做什么。
多亏@ALollz评论,我想通了。我回到了我的getnearestpoint
函数,而不是做一堆Series.apply
我现在使用scipy.spatial
cKDTree
来找到最近的点,然后使用矢量化的哈弗正弦距离来计算每个匹配点的真实距离。快得多。以下是以下代码的基础知识:
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree
def getnearestpoint(gdA, gdB, retcol):
gdA_coordinates = np.array(
list(zip(gdA.loc[:, "Longitude"], gdA.loc[:, "Latitude"]))
)
gdB_coordinates = np.array(
list(zip(gdB.loc[:, "Longitude"], gdB.loc[:, "Latitude"]))
)
tree = cKDTree(data=gdB_coordinates)
distances, indices = tree.query(gdA_coordinates, k=1)
#These column names are done as so due to formatting of my 'retcols'
df = pd.DataFrame.from_dict(
{
f"Longitude_{retcol[:4]}": gdB.loc[indices, "Longitude"].values,
f"Latitude_{retcol[:4]}": gdB.loc[indices, "Latitude"].values,
retcol: gdB.loc[indices, retcol].values,
}
)
gdA = pd.merge(left=gdA, right=df, left_on=gdA.index, right_on=df.index)
gdA.drop(columns="key_0", inplace=True)
return gdA
def combine_yield(field):
#same preprocessing as before
for i in range(0, len(dfs)):
mdf = getnearestpoint(mdf, dfs[i], dfs[i].columns[-1])
main_coords = np.array(list(zip(mdf.Longitude, mdf.Latitude)))
lat_main = main_coords[:, 1]
longitude_main = main_coords[:, 0]
longitude_cols = [
c for c in mdf.columns for m in [re.search(r"Longitude_Bd{4}", c)] if m
]
latitude_cols = [
c for c in mdf.columns for m in [re.search(r"Latitude_Bd{4}", c)] if m
]
year_coords = list(zip_longest(longitude_cols, latitude_cols, fillvalue=np.nan))
for i in year_coords:
year = re.search(r"d{4}", i[0]).group(0)
year_coords = np.array(list(zip(mdf.loc[:, i[0]], mdf.loc[:, i[1]])))
year_coords = np.deg2rad(year_coords)
lat_year = year_coords[:, 1]
longitude_year = year_coords[:, 0]
diff_lat = lat_main - lat_year
diff_lng = longitude_main - longitude_year
d = (
np.sin(diff_lat / 2) ** 2
+ np.cos(lat_main) * np.cos(lat_year) * np.sin(diff_lng / 2) ** 2
)
mdf[f"{year} Distance"] = 2 * (2.0902 * 10 ** 7) * np.arcsin(np.sqrt(d))
return mdf
那我就做Pool.map(combine_yield, (v for k,v in yield_by_field.items()))
这产生了实质性的影响。希望它能帮助其他处于类似困境的人。