我有一个看起来像这样的数据框架
location_id device_id phase timestamp date day_no hour quarter_hour data_received data_sent
10001 1001 Phase 1 2023-01-30 00:00:00 2023-01-30 1 0 00:00:00 150 98
10001 1001 Phase 1 2023-01-30 00:15:00 2023-01-30 1 0 00:15:00 130 101
10001 1001 Phase 1 2023-01-30 00:45:00 2023-01-30 1 0 00:45:00 121 75
10001 1001 Phase 1 2023-01-30 01:00:00 2023-01-30 1 1 01:00:00 104 110
10001 1001 Phase 1 2023-01-30 01:15:00 2023-01-30 1 1 01:15:00 85 79
10001 1001 Phase 1 2023-01-30 01:30:00 2023-01-30 1 1 01:45:00 127 123
. . . . . . . . . .
10001 1001 Phase 1 2023-02-03 23:30:00 2023-02-03 5 23 23:30:00 100 83
10001 1001 Phase 1 2023-02-03 23:45:00 2023-02-03 5 23 23:45:00 121 75
10001 1005 Phase 2 2023-02-15 02:15:00 2023-02-15 1 2 02:15:00 90 101
10001 1005 Phase 2 2023-02-15 02:30:00 2023-02-15 1 2 02:30:00 111 98
. . . . . . . . . .
10001 1005 Phase 2 2023-02-19 23:15:00 2023-02-19 5 23 23:15:00 154 76
10001 1005 Phase 2 2023-02-19 23:30:00 2023-02-19 5 23 23:30:00 97 101
10003 1010 Phase 2 2023-01-14 00:00:00 2023-01-14 1 0 00:00:00 112 87
10003 1010 Phase 2 2023-01-14 00:15:00 2023-01-14 1 0 00:15:00 130 101
10003 1010 Phase 2 2023-01-14 00:30:00 2023-01-14 1 0 00:30:00 89 91
. . . . . . . . . .
10003 1010 Phase 2 2023-01-18 23:45:00 2023-01-18 5 23 23:45:00 123 117
每个位置都有不同的阶段,一个接一个地发生,每个阶段都有一个设备分配给它,持续5天。在这5天里,我们在数据框中每隔15分钟获取一次设备的状态。数据框有时可能有丢失的数据行,或者上一阶段的设备数据也可能丢失(尚未上传),需要识别这些丢失的行。有时设备可以在阶段开始后打开,并且在设备关闭时可能会丢失行,这些行不应该被识别为丢失的数据行。上述数据框的输出如下所示
location_id phase day_no hour quarter_hour
10001 Phase 1 1 0 00:30:00
10001 Phase 2 5 23 23:45:00
10003 Phase 1 1 0 00:00:00
10003 Phase 1 1 0 00:15:00
10003 Phase 1 1 0 00:30:00
. . . . .
10003 Phase 1 5 23 23:45:00
下面的代码工作,我可以识别所有行丢失的数据,但代码是非常慢的,特别是下面的行是我的脚本的瓶颈。
day_quarter_hour_data_check = quarter_hourly_data_df[(quarter_hourly_data_df['location_id'].isin([location_id])) & (quarter_hourly_data_df['phase'].isin([phase])) & (quarter_hourly_data_df['day_no'].isin([cur_day])) & (quarter_hourly_data_df['quarter_hour'].isin([quarter_hour]))]['timestamp']
quarter_hourly_data_df = pd.read_csv('location_quarter_hourly_data.csv')
quarter_hourly_data_df = quarter_hourly_data_df.astype({'timestamp':'datetime64[ns]'})
quarter_hourly_data_df['quarter_hour'] = quarter_hourly_data_df['timestamp'].dt.time
start_times_data_df = quarter_hourly_data_df.groupby(['location_id','device_id','phase']).agg(start_time=pd.NamedAgg(column="timestamp", aggfunc="min"),day_no=pd.NamedAgg(column="day_no", aggfunc="min")).reset_index()
start_times_data_df['start_time'] = start_times_data_df['start_time'].astype('datetime64[ns]')
start_times_data_df['quarter_hour'] = start_times_data_df['start_time'].dt.time
location_ids = start_times_data_df['location_id'].unique()
location_ids.sort()
phases = start_times_data_df['phase'].unique()
phases.sort()
location_phases_df = quarter_hourly_data_df[['location_id','phase']].drop_duplicates()
location_last_phase_df = location_phases_df.groupby(['location_id']).agg(last_phase=pd.NamedAgg(column='phase', aggfunc='max')).reset_index()
location_site_df = location_df[['site_id','location_id']].drop_duplicates()
location_site_df.rename(columns={"site_id": "site_no"}, inplace = True)
quarter_hours= pd.date_range("00:00", "23:45", freq="15min").time
location_phase_quarter_hour_missing_data = []
for location_id in location_ids:
location_last_phase = location_last_phase_df[(location_last_phase_df['location_id'].isin([location_id]))]['last_phase'].values[0]
for phase in phases:
if int(phase[6]) <= int(location_last_phase[6]):
# check if phase exists or not
location_phase_data_check = start_times_data_df[(start_times_data_df['location_id'].isin([location_id])) & (start_times_data_df['phase'].isin([phase]))]['start_time']
# if phase exists get start time of phase
if not location_phase_data_check.empty:
location_start_day = start_times_data_df[(start_times_data_df['location_id'].isin([location_id])) & (start_times_data_df['phase'].isin([phase]))]['day_no'].values[0]
location_start_time = start_times_data_df[(start_times_data_df['location_id'].isin([location_id])) & (start_times_data_df['phase'].isin([phase]))]['quarter_hour'].values[0]
for day_no in range(5):
cur_day = day_no+1
for quarter_hour in quarter_hours:
if ((quarter_hour >= location_start_time) and (cur_day >= location_start_day)) :
#check if data exists for every quarter hour after start time
day_quarter_hour_data_check = quarter_hourly_data_df[(quarter_hourly_data_df['location_id'].isin([location_id])) & (quarter_hourly_data_df['phase'].isin([phase])) & (quarter_hourly_data_df['day_no'].isin([cur_day])) & (quarter_hourly_data_df['quarter_hour'].isin([quarter_hour]))]['timestamp']
# if data doesn't exist add row to missing data
if day_quarter_hour_data_check.empty:
location_phase_quarter_hour_missing_data.append({'location_id':location_id, 'phase':phase, 'day_no':cur_day, 'hour':pd.to_datetime(quarter_hour, format='%H:%M:%S').hour, 'quarter_hour':quarter_hour})
else: #if phase doesn't exist add all rows to missing data
for day_no in range(5):
cur_day = day_no+1
for quarter_hour in quarter_hours:
location_phase_quarter_hour_missing_data.append({'location_id':location_id, 'phase':phase, 'day_no':cur_day, 'hour':pd.to_datetime(quarter_hour, format='%H:%M:%S').hour, 'quarter_hour':quarter_hour})
location_phase_missing_data_df = pd.DataFrame(location_phase_quarter_hour_missing_data)
location_phase_missing_data_df.to_csv('location_phase_missing_rows.csv', index=False)
是否有其他方法可以更快地识别缺失的行或任何优化代码的方法?
我不打算给你复制/粘贴代码而是我要使用的策略,下面的代码中几乎肯定有一些错误
df = pandas.read_csv("input.csv")
# assume it looks something like the df listed above
def check_a_single_device_entries(entries):
# sort by timestamp
entries = entries.sort_values("timestamp")
# get the diffs between timestamps
diffs = entries['timestamp'].shift(-1) - entries['timestamp']
# return the error rows
return entries[diffs != 15MINUTES]
# group by device/location
device_groups = df.groupby(["location_id","device_id"])
problems_per_device = device_groups.agg(check_a_single_device_entries)
print(problems_per_device) # from here you can probably get to your desired state
这主要利用了pandas的向量化特性,使您可以有效地手动遍历每个位置/设备组合,这在大多数情况下应该比遍历所有行
快得多。