熊猫将不规则的时间序列与不同的频率对齐



我需要随时间比较 3 个时间序列。显然,它们需要对齐才能具有可比性。不幸的是,3 个时间序列中有 2 个是不规则的。另外 2 个范围从每天 2 到 500k 次观测值不等,每ID和每天。

初始时间序列每 300 毫秒可用一次,并且可以与其他两个时间序列连接。

但是我有2个问题:

  1. 我上面介绍的这3个序列是ID, time, value的格式,即每个序列构成每个组的一个单独的时间序列
  2. 制定连接条件,即假设LEFT和最细粒度在一段时间内可连接,因为可能没有完全匹配

编辑

一些虚拟数据

import pandas as pd
from datetime import datetime
import numpy as np
def make_df(frequency, valueName):
date_rng = pd.date_range(start='2018-01-01', end='2018-01-02', freq=frequency)
ts = pd.Series(np.random.randn(len(date_rng)), index=date_rng)
groups = ['a', 'b', 'c', 'd', 'e']
group_series = [groups[np.random.randint(len(groups))] for i in range(0, len(date_rng))]
df = pd.DataFrame(ts, columns=[valueName])
df['group'] = group_series
return df
df_1 = make_df('ms', 'value_A')
display(df_1.head())
df_2 = make_df('H', 'value_B')
display(df_2.head())
df_3 = make_df('S', 'value_C')
display(df_3.head())

代码(都不是真正的pythonic(: 我正在尝试一些类似于 SQL 中a JOIN b ON a.group = b.group AND time in window(some_seconds)的非等连接,但如果有多个记录匹配,即不仅第一个,而且所有记录都匹配/生成一行,这就会出现问题。

另外,我调整了类似于(spark(的数据分组:df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")但这可能是非常有损的。

然后我发现(熊猫(熊猫将多个数据帧与时间戳索引对齐,这看起来已经很有趣,但只产生完全匹配。但是,当尝试使用df_2.join(df_3, how='outer', on=['group'], rsuffix='_1')时,它不仅在(确切(时间上加入,而且group失败,并显示需要pd.concat的错误。

经过更多的搜索,我找到了 (pyspark( https://github.com/twosigma/flint 它在一个间隔内实现了时间序列连接 - 但是,我在使用它时遇到了问题。

我找不到一种简单的方法来在熊猫中做到这一点 - 所以我直接在 Spark 中执行此操作。

燧石是我选择的工具。最初,弗林特不适用于 spark 2.2,但在这里修复了我:https://github.com/geoHeil/flint/commit/a2827d38e155ec8ddd4252dc62d89181f14f0c47 以下内容工作正常:

val left = Seq((1,1L, 0.1), (1, 2L,0.2), (3,1L,0.3), (3, 2L,0.4)).toDF("groupA", "time", "valueA")
val right = Seq((1,1L, 11), (1, 2L,12), (3,1L,13), (3, 2L,14)).toDF("groupB", "time", "valueB")
val leftTs = TimeSeriesRDD.fromDF(dataFrame = left)(isSorted = false, timeUnit = MILLISECONDS)
val rightTS        = TimeSeriesRDD.fromDF(dataFrame = right)(isSorted = false, timeUnit = MILLISECONDS)
val mergedPerGroup = leftTs.leftJoin(rightTS, tolerance = "1s")

即,它对所有组执行某种笛卡尔连接:

mergedPerGroup.toDF.filter(col("groupA") === col("groupB")).show
+-------+------+------+------+------+
|   time|groupA|valueA|groupB|valueB|
+-------+------+------+------+------+
|1000000|     3|   0.3|     3|    13|
|2000000|     3|   0.4|     3|    14|

要删除重复项,请使用不同的重复项。

最新更新