如何修改下面的代码来添加每个组的运行平均值,而没有:
- 查询中强制多次通过的任何内容(例如,group by或partition by)
- 计算运行平均值的潜在溢出
一组元组都是相邻的元组,通过id具有相同的grp值,因此必须以纯流水线方式执行查询的方式计算该组中度量的运行平均值,只需要扫描表一次。
示例输出如下:
-- TABLE Stream
echo -- creating table "Stream"
drop table if exists Stream;
create table Stream (
id int,
grp int,
measure int,
constraint streamPK
primary key (id),
constraint idNotNeg
check (id >= 0),
constraint grpNotNeg
check (grp >= 0)
);
-- ---------------------------------------------------------------------------
-- POPULATE: add some tuples to table Stream
echo -- populating "Stream"
insert into Stream (id, grp, measure)
values
( 0, 0, 2),
( 1, 0, 3),
( 2, 1, 5),
( 3, 1, 7),
( 4, 1, 11),
( 5, 0, 13),
( 6, 0, 17),
( 7, 0, 19),
( 8, 0, 23),
( 9, 2, 29),
(10, 2, 31),
(11, 5, 37),
(12, 3, 41),
(13, 3, 43);
echo -- creating composite type "intRec"
drop type if exists
intRec
cascade;
create type intRec as (
number int,
restart boolean
);
-- ---------------------------------------------------------------------------
-- runningSum_state : accumulator function
echo -- creating function "runningSum_state"
drop function if exists
runningSum_state(int, intRec)
cascade;
create function runningSum_state(int, intRec)
returns int
language plpgsql
as $f$
declare i alias for $1;
declare a alias for $2;
declare j int;
begin
if a.restart or i is null then
j := a.number;
elsif a.number is null then
j := i;
else
j := a.number + i;
end if;
return j;
end
$f$;
-- ---------------------------------------------------------------------------
-- runningSum_final : returns the aggregate value
echo -- creating function "runningSum_final"
drop function if exists
runningSum_final(int)
cascade;
create function runningSum_final(int)
returns intRec
language sql
as $f$
select cast(($1, false) as intRec);
$f$;
-- ---------------------------------------------------------------------------
-- runningSum : the aggregate function
echo -- creating aggregate function "runningSum"
drop aggregate if exists
runningSum(intRec)
cascade;
create aggregate runningSum(intRec) (
sfunc = runningSum_state,
stype = int,
finalfunc = runningSum_final
);
-- ---------------------------------------------------------------------------
-- pipeline sliging-window query that uses our agggregate function
echo -- querying "Stream" with running sum
with
-- look at the neighbour tuple to the left to fetch its grp value
CellLeft (id, grp, measure, lft) as (
select id,
grp,
measure,
coalesce(
max(grp) over (
order by id
rows between
1 preceding
and
1 preceding ),
-1 )
from Stream
),
-- determine whether current tuple is start of a group
CellStart(id, grp, measure, start) as (
select id,
grp,
measure,
cast(
case
when grp = lft then 0
else 1
end
as boolean)
from CellLeft
),
-- bundle the measure and start-flag into an intRC
CellFlag(id, grp, intRC) as (
select id,
grp,
cast((measure, start) as intRec)
from CellStart
),
-- call our runningSum aggregator
CellRun(id, grp, measure, runningRC) as (
select id,
grp,
(intRC).number,
runningSum(intRC)
over (order by id)
from CellFlag
),
-- extract the running sum from the composite
CellAggr(id, grp, measure, running) as (
select id,
grp,
measure,
(runningRC).number
from CellRun
)
-- report
select id, grp, measure, running
from CellAggr
order by id;
id | grp | measure | average
----+-----+---------+------------------
0 | 0 | 2 | 2
1 | 0 | 3 | 2.5
2 | 1 | 5 | 5
3 | 1 | 7 | 6
4 | 1 | 11 | 7.66666666666667
5 | 0 | 13 | 13
6 | 0 | 17 | 15
7 | 0 | 19 | 16.3333333333333
8 | 0 | 23 | 18
9 | 2 | 29 | 29
10 | 2 | 31 | 30
11 | 5 | 37 | 37
12 | 3 | 41 | 41
13 | 3 | 43 | 42
(14行)
在SQL中,您可以将其表示为:
select t.*,
avg(measure) over (partition by grp order by id) as group_running_avg
from t;
你必须信任执行计划,但你可以通过(grp, id, measure)
上的索引来帮助它。