提高函数的性能(速度)



我有一个病人及其结果的数据库。下面是示例数据框架:

import pandas as pd
import numpy as np
from scipy.stats import linregress
data =  [[1 , '20210201', 4567, 40],
[1 , '20210604', 4567, 55],
[1 , '20200405', 2574, 42],
[1 , '20210602', 2574, 55],
[2 , '20210201', 4567, 25],
[2 , '20210604', 4567, 32],
[2 , '20200405', 2574, 70],
[2 , '20210602', 2574, 46]]
df = pd.DataFrame(data, columns=['id', 'date', 'test_id', 'result'])
df.date = pd.to_datetime(df.date, format='%Y%m%d') # format date field
df
id       date  test_id  result
0   1 2021-02-01     4567      40
1   1 2021-06-04     4567      55
2   1 2020-04-05     2574      42
3   1 2021-06-02     2574      55
4   2 2021-02-01     4567      25
5   2 2021-06-04     4567      32
6   2 2020-04-05     2574      70
7   2 2021-06-02     2574      46
data =  [[1 , '20220101'],
[2 , '20220102']]
customers = pd.DataFrame(data, columns=['id', 'start_date'])
customers.start_date = pd.to_datetime(customers.start_date, format='%Y%m%d') # format date field
print(customers)
id start_date
0   1 2022-01-01
1   2 2022-01-02

以及以下函数,该函数获取客户及其初始日期,并返回初始日期之前特定时间段内每个测试的汇总结果:

def patient_agg_results(df, patient_ID, X, Y, firstAF):
result = pd.DataFrame()
X_date = firstAF - pd.DateOffset(months=X)
Y_date = firstAF - pd.DateOffset(months=X+Y)
# get results of specific patient within the timeframe
patient_results = df[(df['id'] == patient_ID) & (df['date'] < X_date) & (df['date'] > Y_date)]  # ***
if (len(patient_results) > 0 ):
# Calculate mean
curr_result = pd.DataFrame(patient_results.groupby('test_id').mean()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_mean')
result = pd.concat([result,curr_result])
# Calculate newest result
curr_result = pd.DataFrame(patient_results.groupby('test_id').max()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_new')
result = pd.concat([result,curr_result])
# Calculate oldest result
curr_result = pd.DataFrame(patient_results.groupby('test_id').min()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_old')
result = pd.concat([result,curr_result])
# Calculate STD
curr_result = pd.DataFrame(patient_results.groupby('test_id').std()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_std')
result = pd.concat([result,curr_result])
# Calculate slope
patient_results['int_date'] = pd.to_datetime(patient_results['date']).astype(np.int64) # create integer date
curr_result = pd.DataFrame(patient_results.groupby('test_id')['result', 'int_date'].apply(lambda v: linregress(v.int_date, v.result)[0]))
curr_result.columns = ['result']
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_slope')
result = pd.concat([result,curr_result])
result['id'] = patient_ID
return result.to_dict()

我这样使用这个函数:

customers['lab_results'] = customers.apply(lambda row: patient_agg_results(df,row['id'],12,12,row['start_date']),axis=1)

问题是我的原始数据集包括大约一百万名患者和几百万个结果,这需要这段代码运行几天。最耗时的行是过滤行(注释:***)

有什么办法让它更有效率吗?

这是一个很好的问题。通常,当所有数据都在内存中,然后运行数天时,这是因为它们遇到了执行组合的噩梦。您指向选择行的指针作为最大时间消耗者是完全有意义的-这是一个cpu限制的进程。

如果您假设df有300万行,那么在该过滤器行中-因为它是apply函数的一部分-有900万次比较,然后发生100万次。这是一个巨大的数字,可能会导致数天的处决。Spark在处理大数据方面非常出色,但我怀疑您在Spark中也会遇到同样的cpu限制组合噩梦。

您可以从另一个方向获得巨大的好处-从df获得groupby

df.grouby('id').apply(…)

来自groupbyapply函数将接收每个id的所有列和行的完整子帧。使用这种方法,您将对每个id进行平均3行乘以2的日期选择,而不是300万次乘以2。重复的300万行id选择完全消失了。在初始groupby操作期间处理一次。

df.groupby.apply函数中,您可以通过使用customer.at[id, 'start_date']customer数据帧中查找所需的内容。(在将customer数据帧的索引设置为id之后——这将加快速度)

通过合并函数中的groupby调用集而不是使用一个groupby.agg(),可能会节省一些费用。虽然我没有机会完全消化这一点来提出一个更明确的建议。

一旦你控制了组合,你可以去这里寻找一些提高性能的好方法:https://pandas.pydata.org/docs/user_guide/enhancingperf.html

天哪,我喜欢这种问题!我希望我有更多的时间。

这里是一些初学者代码来传达这个想法

customers = customers.set_index('id')
def jch_agg_result(gf, X, Y):
firstAF = customers.at[gf['id'].iat[0], 'start_date']
X_date = firstAF - pd.DateOffset(months=X)
Y_date = firstAF - pd.DateOffset(months=X+Y)

patient_results = gf[(gf['date'] < X_date) & (gf['date'] > Y_date)] 

if (len(patient_results) > 0 ):
tid = gf['test_id'].iat[0]
m = {f'{tid}_mean': gf['result'].mean(), f'{tid}_max': gf['result'].max()}
return m

return np.nan

:

df.groupby(['id','test_id']).apply(jch_agg_result,12,12).dropna()
id  test_id
1   2574       {'2574_mean': 48.5, '2574_max': 55}
2   2574       {'2574_mean': 58.0, '2574_max': 70}

你只需要合并回适用的日期来得到你想要的结果。不确定我是否得到了内部平均值,最大值等,但它在那里,所以你可以修改以满足你的需要。

PySpark应该能够帮助您。可能还有其他更快的解决方案,但这将是快速运行和实现。PySpark和Pandas之间的大多数函数通常是相似的,根据我对大型数据集的简单操作的经验,PySpark应该可以帮助您。

相关内容

  • 没有找到相关文章

最新更新