如何提高时间序列的重合过滤性能?



我正在研究流体动力学的非平稳实验数据。我们在三个通道上都有测量数据,因此样本不会直接重合(同时测量)。我想用窗口方案过滤它们,以获得重合的样本并消除所有其他样本。

不幸的是,由于公司的限制,我无法上传原始数据集。但我试图建立一个最小的示例,它生成一个类似的(较小的)数据集。原始数据集由每个通道 500000 个值组成,每个值都标有到达时间。通过这些时间戳检查重合。

刚才,我遍历了第一个通道的每个样本,并查看了与其他通道的时间差。如果它小于指定的窗口宽度,则保存索引。如果我指定一个检查差异的间隔(例如邻域中的 100 或 1000 个样本),可能会快一点。但是通道之间的数据速率可能差异很大,因此尚未实现。如果可能的话,我更喜欢摆脱循环每个样本。

def filterCoincidence(df, window = 50e-6):
'''
Filters the dataset with arbitrary different data rates on different channels to coincident samples.
The coincidence is checked with regard to a time window specified as argument.
'''
AT_cols = [col for col in df.columns if 'AT' in col]
if len(AT_cols) == 1:
print('only one group available')
return
used_ix = np.zeros( (df.shape[0], len(AT_cols)))
used_ix.fill(np.nan)
for ix, sample in enumerate(df[AT_cols[0]]):
used_ix[ix, 0] = ix
test_ix = np.zeros(2)
for ii, AT_col in enumerate(AT_cols[1:]):
diff = np.abs(df[AT_col] - sample)
index = diff[diff <= window].sort_values().index.values
if len(index) == 0:
test_ix[ii] = None
continue
test_ix[ii] = [ix_use if (ix_use not in used_ix[:, ii+1] or ix == 0) else None for ix_use in index][0]
if not np.any(np.isnan(test_ix)):
used_ix[ix, 1:] = test_ix
else:
used_ix[ix, 1:] = [None, None]
used_ix = used_ix[~np.isnan(used_ix).any(axis=1)]
print(used_ix.shape)
return
no_points = 10000
no_groups = 3
meas_duration = 60
df = pd.DataFrame(np.transpose([np.sort(np.random.rand(no_points)*meas_duration) for _ in range(no_groups)]), columns=['AT {}'.format(i) for i in range(no_groups)])
filterCoincidence(df, window=1e-3)

是否已经实现了一个模块,可以进行这种过滤?但是,如果您能给我一些提示来提高代码的性能,那就太棒了。

只是为了更新这个线程,如果其他人有类似的问题。我认为经过几次代码修订,我找到了适当的解决方案。

def filterCoincidence(self, AT1, AT2, AT3, window = 0.05e-3):
'''
Filters the dataset with arbitrary different data rates on different channels to coincident samples.
The coincidence is checked with regard to a time window specified as argument.
- arguments:
- three times series AT1, AT2 and AT3 (arrival times of particles in my case)
- window size (50 microseconds as default setting)
- output: indices of combined samples
'''
start_time = datetime.datetime.now()
AT_list = [AT1, AT2, AT3]
# take the shortest period of time
min_EndArrival = np.max(AT_list)
max_BeginArrival = np.min(AT_list)
for i, col in enumerate(AT_list):
min_EndArrival = min(min_EndArrival, np.max(col))
max_BeginArrival = max(max_BeginArrival, np.min(col))
for i, col in enumerate(AT_list):
AT_list[i] = np.delete(AT_list[i], np.where((col < max_BeginArrival - window) | (col > min_EndArrival + window)))
# get channel with lowest datarate
num_points = np.zeros(len(AT_list))
datarate = np.zeros(len(AT_list))
for i, AT in enumerate(AT_list):
num_points[i] = AT.shape[0]
datarate[i] = num_points[i] / (AT[-1]-AT[0])
used_ref = np.argmin(datarate)
# process coincidence
AT_ref_val = AT_list[used_ref]
AT_list = list(np.delete(AT_list, used_ref))
overview = np.zeros( (AT_ref_val.shape[0], 3), dtype=int)
overview[:,0] = np.arange(AT_ref_val.shape[0], dtype=int)
borders = np.empty(2, dtype=object)
max_diff = np.zeros(2, dtype=int)
for i, AT in enumerate(AT_list):
neighbors_lower = np.searchsorted(AT, AT_ref_val - window, side='left')
neighbors_upper = np.searchsorted(AT, AT_ref_val + window, side='left')
borders[i] = np.transpose([neighbors_lower, neighbors_upper])
coinc_ix = np.where(np.diff(borders[i], axis=1).flatten() != 0)[0]
max_diff[i] = np.max(np.diff(borders[i], axis=1))
overview[coinc_ix, i+1] = 1
use_ix = np.where(~np.any(overview==0, axis=1))
borders[0] = borders[0][use_ix]
borders[1] = borders[1][use_ix]
overview = overview[use_ix]
# create all possible combinations refer to the reference
combinations = np.prod(max_diff)
test = np.empty((overview.shape[0]*combinations, 3), dtype=object)
for i, [ref_ix, at1, at2] in enumerate(zip(overview[:, 0], borders[0], borders[1])):
test[i * combinations:i * combinations + combinations, 0] = ref_ix
at1 = np.arange(at1[0], at1[1])
at2 = np.arange(at2[0], at2[1])
test[i*combinations:i*combinations+at1.shape[0]*at2.shape[0],1:] = np.asarray(list(itertools.product(at1, at2)))
test = test[~np.any(pd.isnull(test), axis=1)]
# check distances
ix_ref = test[:,0]
test = test[:,1:]
test = np.insert(test, used_ref, ix_ref, axis=1)
test = test.astype(int)
AT_list.insert(used_ref, AT_ref_val)
AT_mat = np.zeros(test.shape)
for i, AT in enumerate(AT_list):
AT_mat[:,i] = AT[test[:,i]]
distances = np.zeros( (test.shape[0], len(list(itertools.combinations(range(3), 2)))))
for i, AT in enumerate(itertools.combinations(range(3), 2)):
distances[:,i] = np.abs(AT_mat[:,AT[0]]-AT_mat[:,AT[1]])
ix = np.where(np.all(distances <= window, axis=1))[0]
test = test[ix,:]
distances = distances[ix,:]
# check duplicates
# use sum of differences as similarity factor
dist_sum = np.max(distances, axis=1)
unique_sorted = np.argsort([np.unique(test[:,i]).shape[0] for i in range(test.shape[1])])[::-1]
test = np.hstack([test, dist_sum.reshape(-1, 1)])
test = test[test[:,-1].argsort()]
for j in unique_sorted:
_, ix = np.unique(test[:,j], return_index=True)
test = test[ix, :]
test = test[:,:3]
test = test[test[:,used_ref].argsort()]
# check that all values are after each other
ix = np.where(np.any(np.diff(test, axis=0) > 0, axis=1))[0]
ix = np.append(ix, test.shape[0]-1)
test = test[ix,:]
print('{} coincident samples obtained in {}.'.format(test.shape[0], datetime.datetime.now()-start_time))
return test

我确信有更好的解决方案,但对我来说它现在有效。而且我知道,变量名称绝对应该更清晰地选择(例如test),但我会在硕士论文结束时清理我的代码......也许:-)

最新更新