我们的传感器以不规则的时间间隔产生值:
12:00 1012:02 2012:22 30下午12:29
我正在努力寻找一个时间序列数据库,它可以自动计算某些规则时间间隔(例如10分钟)的平均值。当然,一个值在区间内有效的时间越长,它在平均值(时间加权平均值)中的权重就越大。(例如12:00-12:10:(10*2+20*8)/10=18)
我现在在互联网上搜索了几个小时,发现了很多关于不规则时间序列的时间序列数据库(如InfluxDB、OpenTDSB等),其中大多数都有一些类似SQL的查询语言,具有聚合功能。
不幸的是,他们没有说明不规则时间间隔的平均值。由于我不想全部尝试,有人能告诉我哪些数据库支持时间加权平均值的计算吗?谢谢
OpenTSDB在查询所暗示的时间跨查询中的所有系列执行聚合。对于任何在时间戳处没有数据值的序列,它会从之前和之后的值中线性插值。它在查询时进行这种"上采样"——原始数据总是按照到达时的原样存储。你可以执行尾随窗口时间平均,但不能执行指数加权移动平均(我相信这就是你所说的时间加权?)
http://opentsdb.net/docs/build/html/user_guide/query/aggregators.html
(我应该补充一点,这不是对OpenTSDB的全面建议,因为它是你应该使用的数据库,我只是在回答你的问题)
我最近不得不为我们自己的SCADA/IoT产品提供一个不规则样本加权平均的解决方案,数据存储在PostgreSQL中。如果你想自己滚动,下面是你可以做的。
让我们假设下表:
create table samples (
stamp timestamptz,
series integer,
value float
);
insert into samples values
('2018-04-30 23:00:00+02', 1, 12.3),
('2018-05-01 01:45:00+02', 1, 22.2),
('2018-05-01 02:13:00+02', 1, 21.6),
('2018-05-01 02:26:00+02', 1, 14.9),
('2018-05-01 03:02:00+02', 1, 16.9);
要计算常规加权平均值,我们需要执行以下操作:
- 将不规则样本"划分"为规则周期
- 确定每个样本的保存时间(持续时间)
- 计算每个样本的重量(其持续时间除以周期)
- 每个周期的总价值乘以权重
在展示代码之前,我们将做出以下假设:
- 加权平均值是针对给定时间范围和给定周期计算的
- 我们不需要处理零值,这会使解决方案稍微复杂一些(即在计算权重时)
- 该代码是使用两种技术为PostgreSQL编写的:通用表表达式和窗口函数。如果您使用另一个DB,您可能需要以不同的方式编写它
1.将不规则样本转换为规则周期
假设我们有兴趣计算序列1
的2018-05-01 00:00:00+02
和2018-05-01 04:00:00+02
之间时间段的小时加权平均值。我们将从查询给定的时间范围开始,添加一个对齐的戳:
select
stamp,
to_timestamp(extract (epoch from stamp)::integer / 3600 * 3600)
as stamp_aligned,
value
from samples
where
series = 1 and
stamp >= '2018-05-01 00:00:00+02' and
stamp <= '2018-05-01 04:00:00+02';
这给了我们:
stamp | stamp_aligned | value
------------------------+------------------------+-------
2018-05-01 01:45:00+02 | 2018-05-01 01:00:00+02 | 22.2
2018-05-01 02:13:00+02 | 2018-05-01 02:00:00+02 | 21.6
2018-05-01 02:26:00+02 | 2018-05-01 02:00:00+02 | 14.9
2018-05-01 03:02:00+02 | 2018-05-01 03:00:00+02 | 16.9
(4 rows)
我们会注意到:
- 从结果中,我们无法判断
00:00:00
的值,也无法判断01:00:00
的值 stamp_aligned
列告诉我们该记录属于哪个时间段,但实际上该表缺少每个时间段开始时的值
为了解决这些问题,我们将查询给定时间范围之前的最后一个已知值,并添加舍入小时的记录,稍后我们将用正确的值填充这些记录:
with
t_values as (
select * from (
-- select last value prior to time range
(select
stamp,
to_timestamp(extract(epoch from stamp)::integer / 3600 * 3600)
as stamp_aligned,
value,
false as filled_in
from samples
where
series = 1 and
stamp < '2018-05-01 00:00:00+02'
order by
stamp desc
limit 1) union
-- select records from given time range
(select
stamp,
to_timestamp(extract(epoch from stamp)::integer / 3600 * 3600)
as stamp_aligned,
value,
false as filled_in
from samples
where
series = 1 and
stamp >= '2018-05-01 00:00:00+02' and
stamp <= '2018-05-01 04:00:00+02'
order by
stamp) union
-- select all regular periods for given time range
(select
stamp,
stamp as stamp_aligned,
null as value,
true as filled_in
from generate_series(
'2018-05-01 00:00:00+02',
'2018-05-01 04:00:00+02',
interval '3600 seconds'
) stamp)
) states
order by stamp
)
select * from t_values;
这给了我们
stamp | stamp_aligned | value | filled_in
------------------------+------------------------+-------+-----------
2018-04-30 23:00:00+02 | 2018-04-30 23:00:00+02 | 12.3 | f
2018-05-01 00:00:00+02 | 2018-05-01 00:00:00+02 | ¤ | t
2018-05-01 01:00:00+02 | 2018-05-01 01:00:00+02 | ¤ | t
2018-05-01 01:45:00+02 | 2018-05-01 01:00:00+02 | 22.2 | f
2018-05-01 02:00:00+02 | 2018-05-01 02:00:00+02 | ¤ | t
2018-05-01 02:13:00+02 | 2018-05-01 02:00:00+02 | 21.6 | f
2018-05-01 02:26:00+02 | 2018-05-01 02:00:00+02 | 14.9 | f
2018-05-01 03:00:00+02 | 2018-05-01 03:00:00+02 | ¤ | t
2018-05-01 03:02:00+02 | 2018-05-01 03:00:00+02 | 16.9 | f
2018-05-01 04:00:00+02 | 2018-05-01 04:00:00+02 | ¤ | t
(10 rows)
因此,我们每个时间段至少有一条记录,但我们仍然需要为填写的记录填写值:
with
t_values as (
...
),
-- since records generated using generate_series do not contain values,
-- we need to copy the value from the last non-generated record.
t_with_filled_in_values as (
-- the outer query serves to remove any record prior to the given
-- time range
select *
from (
select
stamp,
stamp_aligned,
-- fill in value from last non-filled record (the first record
-- having the same filled_in_partition value)
(case when filled_in then
first_value(value) over (partition by filled_in_partition
order by stamp) else value end) as value
from (
select
stamp,
stamp_aligned,
value,
filled_in,
-- this field is incremented on every non-filled record
sum(case when filled_in then 0 else 1 end)
over (order by stamp) as filled_in_partition
from
t_values
) t_filled_in_partition
) t_filled_in_values
-- we wrap the filling-in query in order to remove any record before the
-- beginning of the given time range
where stamp >= '2018-05-01 00:00:00+02'
order by stamp
)
select * from t_with_filled_in_values;
这给了我们以下信息:
stamp | stamp_aligned | value
------------------------+------------------------+-------
2018-05-01 00:00:00+02 | 2018-05-01 00:00:00+02 | 12.3
2018-05-01 01:00:00+02 | 2018-05-01 01:00:00+02 | 12.3
2018-05-01 01:45:00+02 | 2018-05-01 01:00:00+02 | 22.2
2018-05-01 02:00:00+02 | 2018-05-01 02:00:00+02 | 22.2
2018-05-01 02:13:00+02 | 2018-05-01 02:00:00+02 | 21.6
2018-05-01 02:26:00+02 | 2018-05-01 02:00:00+02 | 14.9
2018-05-01 03:00:00+02 | 2018-05-01 03:00:00+02 | 14.9
2018-05-01 03:02:00+02 | 2018-05-01 03:00:00+02 | 16.9
2018-05-01 04:00:00+02 | 2018-05-01 04:00:00+02 | 16.9
(9 rows)
所以我们都很好-我们添加了所有小时都具有正确值的记录,还删除了第一条记录,该记录为我们提供了时间范围开始时的值,但超出了该值。不,我们已经为下一步做好了准备。
2.计算加权平均值
我们将继续计算每条记录的持续时间:
with
t_values as (
...
),
t_with_filled_in_values (
...
),
t_with_weight as (
select
stamp,
stamp_aligned,
value,
-- use window to get stamp from next record in order to calculate
-- the duration of the record which, divided by the period, gives
-- us the weight.
coalesce(extract(epoch from (lead(stamp)
over (order by stamp) - stamp)), 3600)::float / 3600 as weight
from t_with_filled_in_values
order by stamp
)
select * from t_with_weight;
这给了我们:
stamp | stamp_aligned | value | weight
------------------------+------------------------+-------+--------------------
2018-05-01 00:00:00+02 | 2018-05-01 00:00:00+02 | 12.3 | 1
2018-05-01 01:00:00+02 | 2018-05-01 01:00:00+02 | 12.3 | 0.75
2018-05-01 01:45:00+02 | 2018-05-01 01:00:00+02 | 22.2 | 0.25
2018-05-01 02:00:00+02 | 2018-05-01 02:00:00+02 | 22.2 | 0.216666666666667
2018-05-01 02:13:00+02 | 2018-05-01 02:00:00+02 | 21.6 | 0.216666666666667
2018-05-01 02:26:00+02 | 2018-05-01 02:00:00+02 | 14.9 | 0.566666666666667
2018-05-01 03:00:00+02 | 2018-05-01 03:00:00+02 | 14.9 | 0.0333333333333333
2018-05-01 03:02:00+02 | 2018-05-01 03:00:00+02 | 16.9 | 0.966666666666667
2018-05-01 04:00:00+02 | 2018-05-01 04:00:00+02 | 16.9 | 1
(9 rows)
剩下的就是总结:
with
t_values as (
...
),
t_with_filled_in_values (
...
),
t_with_weight as (
...
)
select
stamp_aligned as stamp,
sum(value * weight) as avg
from t_with_weight
group by stamp_aligned
order by stamp_aligned;
结果:
stamp | avg
------------------------+------------------
2018-05-01 00:00:00+02 | 12.3
2018-05-01 01:00:00+02 | 14.775
2018-05-01 02:00:00+02 | 17.9333333333333
2018-05-01 03:00:00+02 | 16.8333333333333
2018-05-01 04:00:00+02 | 16.9
(5 rows)
你可以在这个要点中找到完整的代码。
CCD_ 7以与当前时间相比线性下降的速率对较旧样本进行加权。
REST API、SQL层和规则引擎都支持此聚合器。
编辑2016-06-15T12:520Z:支持的插值函数:
- 线性
- 以前的
- 下一个
- 价值(v)
- 无
披露:我在Axibase工作。
如果TSDB支持给定时间范围内的值积分函数,则可以计算时间加权平均值(TWA)。然后TWA可以计算为给定持续时间的积分除以持续时间。例如,以下查询计算VictoriaMetrics:中最后一小时度量power
的时间加权平均值
integrate(power[1h])/1h
请参阅MetricsQL文档中有关integrate()
函数的更多详细信息。