我有一个病人及其结果的数据库。下面是示例数据框架:
import pandas as pd
import numpy as np
from scipy.stats import linregress
data = [[1 , '20210201', 4567, 40],
[1 , '20210604', 4567, 55],
[1 , '20200405', 2574, 42],
[1 , '20210602', 2574, 55],
[2 , '20210201', 4567, 25],
[2 , '20210604', 4567, 32],
[2 , '20200405', 2574, 70],
[2 , '20210602', 2574, 46]]
df = pd.DataFrame(data, columns=['id', 'date', 'test_id', 'result'])
df.date = pd.to_datetime(df.date, format='%Y%m%d') # format date field
df
id date test_id result
0 1 2021-02-01 4567 40
1 1 2021-06-04 4567 55
2 1 2020-04-05 2574 42
3 1 2021-06-02 2574 55
4 2 2021-02-01 4567 25
5 2 2021-06-04 4567 32
6 2 2020-04-05 2574 70
7 2 2021-06-02 2574 46
data = [[1 , '20220101'],
[2 , '20220102']]
customers = pd.DataFrame(data, columns=['id', 'start_date'])
customers.start_date = pd.to_datetime(customers.start_date, format='%Y%m%d') # format date field
print(customers)
id start_date
0 1 2022-01-01
1 2 2022-01-02
以及以下函数,该函数获取客户及其初始日期,并返回初始日期之前特定时间段内每个测试的汇总结果:
def patient_agg_results(df, patient_ID, X, Y, firstAF):
result = pd.DataFrame()
X_date = firstAF - pd.DateOffset(months=X)
Y_date = firstAF - pd.DateOffset(months=X+Y)
# get results of specific patient within the timeframe
patient_results = df[(df['id'] == patient_ID) & (df['date'] < X_date) & (df['date'] > Y_date)] # ***
if (len(patient_results) > 0 ):
# Calculate mean
curr_result = pd.DataFrame(patient_results.groupby('test_id').mean()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_mean')
result = pd.concat([result,curr_result])
# Calculate newest result
curr_result = pd.DataFrame(patient_results.groupby('test_id').max()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_new')
result = pd.concat([result,curr_result])
# Calculate oldest result
curr_result = pd.DataFrame(patient_results.groupby('test_id').min()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_old')
result = pd.concat([result,curr_result])
# Calculate STD
curr_result = pd.DataFrame(patient_results.groupby('test_id').std()['result'])
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_std')
result = pd.concat([result,curr_result])
# Calculate slope
patient_results['int_date'] = pd.to_datetime(patient_results['date']).astype(np.int64) # create integer date
curr_result = pd.DataFrame(patient_results.groupby('test_id')['result', 'int_date'].apply(lambda v: linregress(v.int_date, v.result)[0]))
curr_result.columns = ['result']
curr_result = curr_result.set_index(curr_result.index.astype(str) + '_slope')
result = pd.concat([result,curr_result])
result['id'] = patient_ID
return result.to_dict()
我这样使用这个函数:
customers['lab_results'] = customers.apply(lambda row: patient_agg_results(df,row['id'],12,12,row['start_date']),axis=1)
问题是我的原始数据集包括大约一百万名患者和几百万个结果,这需要这段代码运行几天。最耗时的行是过滤行(注释:***)
有什么办法让它更有效率吗?
这是一个很好的问题。通常,当所有数据都在内存中,然后运行数天时,这是因为它们遇到了执行组合的噩梦。您指向选择行的指针作为最大时间消耗者是完全有意义的-这是一个cpu限制的进程。
如果您假设df
有300万行,那么在该过滤器行中-因为它是apply
函数的一部分-有900万次比较,然后发生100万次。这是一个巨大的数字,可能会导致数天的处决。Spark在处理大数据方面非常出色,但我怀疑您在Spark中也会遇到同样的cpu限制组合噩梦。
您可以从另一个方向获得巨大的好处-从df
获得groupby
。
df.grouby('id').apply(…)
来自groupby
的apply
函数将接收每个id
的所有列和行的完整子帧。使用这种方法,您将对每个id
进行平均3行乘以2的日期选择,而不是300万次乘以2。重复的300万行id
选择完全消失了。在初始groupby
操作期间处理一次。
在df.groupby.apply
函数中,您可以通过使用customer.at[id, 'start_date']
从customer
数据帧中查找所需的内容。(在将customer
数据帧的索引设置为id
之后——这将加快速度)
通过合并函数中的groupby
调用集而不是使用一个groupby.agg()
,可能会节省一些费用。虽然我没有机会完全消化这一点来提出一个更明确的建议。
一旦你控制了组合,你可以去这里寻找一些提高性能的好方法:https://pandas.pydata.org/docs/user_guide/enhancingperf.html
天哪,我喜欢这种问题!我希望我有更多的时间。
这里是一些初学者代码来传达这个想法
customers = customers.set_index('id')
def jch_agg_result(gf, X, Y):
firstAF = customers.at[gf['id'].iat[0], 'start_date']
X_date = firstAF - pd.DateOffset(months=X)
Y_date = firstAF - pd.DateOffset(months=X+Y)
patient_results = gf[(gf['date'] < X_date) & (gf['date'] > Y_date)]
if (len(patient_results) > 0 ):
tid = gf['test_id'].iat[0]
m = {f'{tid}_mean': gf['result'].mean(), f'{tid}_max': gf['result'].max()}
return m
return np.nan
:
df.groupby(['id','test_id']).apply(jch_agg_result,12,12).dropna()
id test_id
1 2574 {'2574_mean': 48.5, '2574_max': 55}
2 2574 {'2574_mean': 58.0, '2574_max': 70}
你只需要合并回适用的日期来得到你想要的结果。不确定我是否得到了内部平均值,最大值等,但它在那里,所以你可以修改以满足你的需要。
PySpark应该能够帮助您。可能还有其他更快的解决方案,但这将是快速运行和实现。PySpark和Pandas之间的大多数函数通常是相似的,根据我对大型数据集的简单操作的经验,PySpark应该可以帮助您。