如何使用熊猫创建索引循环 FIFO 缓冲区



我试图创建一个索引的循环FIFO(先进先出)缓冲区,用于保存烛台图的最后90分钟,用于保存按分钟(即window_size=150)聚合的熊猫数据帧中一组15个资产的烛台图的最后90分钟,以实时显示在客户端应用程序(蜡烛图)上。它将在每个(1m)时间步长分别保持每种资产的收盘价、开盘价、高价、低价和成交量特征。单个烛条将由 websocket 更新,其中最新的时间间隔将根据价格(烛条)变化进行更新。在 pandas 中表示此数据结构的最有效机制是什么,客户端应用程序需要输出形状 [5,15,90],如as_frame分别表示 [收盘、开盘、高、低、交易量] 的 [收盘、开盘、高、低、交易量] 的 15 个资产和 90 个区间

。因此,数据将表示为:

assets  time   close    open    high    low    volume
asset1  time1  0.001    0.002   0.003   0.001  0.001
time2  0.001    0.001   0.003   0.001  0.001
...
time90 ...
...
asset15 time1  0.001    0.002   0.003   0.001  0.001
time2  0.001    0.001   0.003   0.001  0.001
...
time90 ...

我已经用python熊猫实现了一个天真的解决方案:

class Buffer():
def __init__(self):
self.cols = [
'asset',
'timestamp',
'close',
'high',
'low'
];
self.lvls = [
'asset',
'timestamp'
]
self.frame = pd.DataFrame(
columns=self.cols
);
self.frame.set_index(self.lvls)
def add(
self,
entry
):
... what would be the most effective
mechanism to add to the multi indexed
dataframe given the entry/record 
{
"asset":"ASSET",
"timestamp": 158090000, 
"close":1.3, 
"high":1.4, 
"low":1.2, 
"open":1.3, 
"volume":134.5 
} 
such that the dataframe timestamp 
index does not exceed the given 
window size?
def as_frame(
self,
assets,
features,
window_size
):
outframe = self.frame.set_index(self.lvls)
outframe = outframe.groupby(self.lvls).last()
outlist = outframe.to_xarray().to_array()
return outlist

如何在熊猫中最有效地实现上述问题? 假设不存在给定另一种数据结构的更优化/性能更高的解决方案?

添加数据应该像使用带有dictappend一样简单,过滤目标DataFrame中的列。

self.frame.append({k:v for k, v in entry.items() if k in self.cols})

根据您的问题,我认为您希望保留一个DataFrame,该在传入数据上保持连续的 150 秒窗口。

为了过滤生成的帧,我会从最新添加的时间窗口计算"开始"时间戳,并使用它来过滤行,如下所示:

start_ts = entry['timestamp'] - 150
self.frame = self.frame[ self.frame['ts'] >= start_ts ]

根据数据帧的大小,您可能会获得更好的性能下降:

self.frame.drop(self.frame[ self.frame['ts'] < start_ts ].index, inplace=True)

最新更新