如何在2个窗口上进行计算/比较



我正在收集具有不同 id 的事件,传入事件中有 n 种类型的固定 ID。我想根据时间框架或否收集过去事件的平均值。不同类型的 ID 之间的事件。假设有 2 台设备发送 ID 为"a"和"b"的数据/事件。我想获取两个设备过去 5 分钟数据的平均值,然后比较两个平均值以做出一些决定。

通过此代码,我正在收集过去 n 分钟数据的数据并存储在 2 个窗口中。`

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);

define window hold_a(avg_amount double) length(1);
define window hold_b(avg_amount double) length(1);

from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_a;
from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount
group by name
insert into hold_b;`

窗口hold_a和hold_b将获得过去 5 分钟的平均数据。现在我想比较来自两个窗口的数据并做出决定。

我已经尝试在两个窗口上加入,但加入查询没有执行。

您必须使用模式来实现此目的。在查询下方输出具有最高平均值的名称到最高平均流。

@source(type='http', receiver.url='http://localhost:5007/SweetProductionEP', @map(type = 'json'))
define stream InProduction(name string, amount int);
from InProduction[name=='a']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;
from InProduction[name=='b']#window.timeBatch(5 min)
select avg(amount) as avg_amount, name
insert into avgStream;`
from every(e1=avgStream -> e2=avgStream)
select ifthenelse(e1.avg_amount>e2.avg_amount,e1.name,e2.name) as highestAvgName
insert into HighestAvgStream;

最新更新