计算给定开始和结束时间的并发会话



我需要能够根据数百万行数据计算出在任何给定时间每分钟正在运行的会话数,如下所示。

我尝试融化数据帧并创建了一个等于 1 或 -1 的新列,具体取决于它是开始还是结束。总结一下并按时间分组,我希望会奏效。

问题是,很多会话可能会在前一天开始,因为我只能查询今天,所以我最终会在凌晨得到负 200 万,因为这些会话结束了。

有没有一种好方法可以做到这一点并绘制图表,以便它显示任何给定时间的会话数?

2020-05-31 00:00:01 | 2020-05-31 00:00:31   
2020-05-31 00:01:01 | 2020-05-31 00:02:01   
2020-05-31 00:02:01 | 2020-05-31 00:06:03   
2020-05-31 00:03:01 | 2020-05-31 00:04:01   
2020-05-31 00:04:01 | 2020-05-31 00:34:01   

所以我有一个尝试,也许有人可以在此基础上建立(否则它可能会打扰某人以提供更好的答案😂)? 这是您的数据,我刚刚添加了列名:

In[1]: df
Out[1]: 
Session_Starts        Session_Ends
0 2020-05-31 00:00:01 2020-05-31 00:00:31
1 2020-05-31 00:01:01 2020-05-31 00:02:01
2 2020-05-31 00:02:01 2020-05-31 00:06:03
3 2020-05-31 00:03:01 2020-05-31 00:04:01
4 2020-05-31 00:04:01 2020-05-31 00:34:01

我将每个会话的开始和结束四舍五入一分钟,然后在新的开始和结束时间之间做一个date_range(以分钟频率)。 这给出了每个会话处于活动状态时唯一分钟数的数组数组。 然后,我将此列表解压缩到Series中并获取value_counts()

import pandas as pd
import numpy as np
from itertools import chain
session_starts = (x - pd.Timedelta(seconds=x.second) for x in df['Session_Starts'])
session_ends = (x - pd.Timedelta(seconds=x.second) for x in df['Session_Ends'])
ranges = (pd.date_range(x,y,freq='1T') for x,y in zip(session_starts,session_ends))
ranges = pd.Series(chain.from_iterable(ranges))
output = ranges.value_counts(sort=False).sort_index()

输出:

2020-05-31 00:00:00    1
2020-05-31 00:01:00    1
2020-05-31 00:02:00    2
2020-05-31 00:03:00    2
2020-05-31 00:04:00    3
2020-05-31 00:05:00    2
2020-05-31 00:06:00    2
2020-05-31 00:07:00    1
2020-05-31 00:08:00    1
...
2020-05-31 00:33:00    1
2020-05-31 00:34:00    1
dtype: int64

问题是规模,也就是你说的数百万次观察。 我正在尝试使用长度低于一百万的玩具数据,它已经开始花费很长时间:

SIZE = 100000
dr = pd.date_range(start='01-01-2020',end='1-02-2020',freq='1T')
col1 = np.random.choice(dr, SIZE)
deltas = pd.Series([pd.Timedelta(minutes = r) for r in np.random.randint(0,10,size=SIZE)])
col2 = col1 + deltas
df = pd.DataFrame({'Session_Starts':col1,'Session_Ends':col2})

使用timeit,通过上面的相同代码运行此df需要20 多秒。 我相信时间是 ~与行数成线性关系。

我想不出更好的办法,但我相信一定有;我很想知道如何改进它(或者只是一个更好的解决方案)。 希望这要么有帮助,要么至少能让球滚动。

我最初的方法是创建一个表示包含数据中所有事件的时间段的DatetimeIndex,然后为每个事件创建一个数组,其维度与索引相同,其值在事件发生时1True,否则0False。 添加这些数组将生成每次的并发事件总数。 一个更好的方法是只考虑新事件开始(+1)或结束(-1)的时间,然后取这些变化的累积总和。 我们可以通过重新索引和填充将这些结果扩展到包含事件的整个时间段。

加载数据

import pandas as pd
# Data from the question
data = [['2020-05-31 00:00:01', '2020-05-31 00:00:31'],
['2020-05-31 00:01:01', '2020-05-31 00:02:01'],
['2020-05-31 00:02:01', '2020-05-31 00:06:03'],
['2020-05-31 00:03:01', '2020-05-31 00:04:01'],
['2020-05-31 00:04:01', '2020-05-31 00:34:01']]
# The data as a DataFrame
df = pd.DataFrame(data,  columns=['Start time', 'End time'], dtype='datetime64[ns]')

创建DatetimeIndex

频率与事件时间戳的时间粒度相匹配是有意义的。

min_time = df['Start time'].min()
max_time = df['End time'].max()
ts_index = pd.date_range(min_time, max_time, freq = 's')

计算并发

在前两种方法中,我们创建一个数据结构,该结构对应于与每个事件的索引相同维度的数组。 这些数组指示事件发生的时间。 如果有很多事件,最好创建一个迭代器,否则我们有内存不足的风险。 第三种方法侧重于事件的开始和结束,而不是在整个时期内描述单个事件。

1. 与系列

这个小示例没有内存不足的风险,因此我们创建了一系列数组并添加它们。

concurrency_array = df.apply(lambda e: ((ts_index >= e[0]) & (ts_index <= e[1])).astype(int), axis='columns').sum()
concurrency = pd.Series(concurrency_array, index = ts_index)

2. 使用迭代器

这将避免一次将所有数组加载到内存中。 请注意,这里我们使用pythonmapsum函数,而不是pandas结构。

concurrency_iter = map(lambda e: (ts_index >= e[0]) & (ts_index <= e[1]), df.values)
concurrency = pd.Series(sum(concurrency_iter), index = ts_index)

3. 仅包含一系列更改(最佳)

这种方法比我想出的任何东西都要快得多,而且总的来说,它更好。 我从这个答案中得到了这个想法。

基本上,我们创建一个系列,其中包含所有事件的所有开始和结束时间,开始时间的值为1,结束时间的值为-1。 然后我们groupby索引值和sum,从而产生一个包含所有变化(即事件开始、结束以及两者的任意组合)的序列。 然后我们取累积总和 (cumsum),它产生并发事件在变化时的总并发事件,即在至少一个事件开始或结束时。 为了获得整个周期的结果,我们只需使用之前创建的索引reindex并向前填充(ffill)。

starts = pd.Series(1, df['Start time'])
ends = pd.Series(-1, df['End time'] + pd.Timedelta('1 sec')) # Include last second
concurrency_changes = pd.concat([starts, ends]) 
.groupby(level=0).sum() 
.cumsum()
concurrency = concurrency_changes.reindex(ts_index, method='ffill')

Result

上述所有方法的结果是一个序列,其索引是我们之前创建的DatetimeIndex,其值是数据中并发事件的总和。

resampling

现在我们有一个包含并发数据的序列,我们可以在方便的时候重新采样。 例如,如果我们正在调查某个资源的最大利用率,我们可能会执行以下操作:

In [5]: concurrency.resample('5T').max()
Out[5]:
2020-05-31 00:00:00    3
2020-05-31 00:05:00    2
2020-05-31 00:10:00    1
2020-05-31 00:15:00    1
2020-05-31 00:20:00    1
2020-05-31 00:25:00    1
2020-05-31 00:30:00    1
Freq: 5T, dtype: int64

最新更新