Siddhi 查询:无法使用外部时间批处理窗口进行帧查询



我们正在评估Siddhi的CEP功能,并希望写一篇论文来描述它解决我们的规则用例的查询能力。

在写论文时,我想给出一个简单的规则。在数据集上

ts                          Stock       Bid                       
------------------------------------------------------------------
2011-07-12 10:23:54.0       abc         10.12                     
2011-07-12 10:23:58.0       abc         10.34                     
2011-07-12 10:23:59.0       abc         10.75                     
2011-07-12 10:25:15.0       abc         11.98                     
2011-07-12 10:25:16.0       abc         <null>                    
2011-07-12 10:25:22.0       xyz         45.16                     
2011-07-12 10:25:27.0       xyz         49.33                     
2011-07-12 10:31:12.0       xyz         65.25                     
2011-07-12 10:31:15.0       xyz         <null>                    

要求:如果同一股票的数量大于或等于某个百分比,例如在 40 秒的窗口内发出警报,例如 60%。

到目前为止,我能够得出这个查询。

define stream StockStream (ts long,stock string, bid double); 
define window StockEventWindow (ts long, stock string, bid double)    externalTimeBatch(ts,40 sec, ts, 3 sec); 
@info(name = 'query1') 
from StockStream 
insert into StockEventWindow; 
@info(name = 'query2') 
from StockEventWindow 
select ts, stock, bid, count(stock) as c 
group by stock 
insert into OutputStream; 

在输出流获得的结果:

[Event{timestamp=1310446439000, data=[1310446439000, abc, 10.75, 3], isExpired=false}]
[Event{timestamp=1310446516000, data=[1310446516000, abc, 0.0, 2], isExpired=false}, Event{timestamp=1310446527000, data=[1310446527000, xyz, 49.33, 2], isExpired=false}]
[Event{timestamp=1310446872000, data=[1310446872000, xyz, 65.25, 1], isExpired=false}]
[Event{timestamp=1310446875000, data=[1310446875000, xyz, 0.0, 1], isExpired=false}]

无法从这里继续前进以获得理想的结果。我正在寻找一个为我提供窗口大小的函数/运算符,因此我可以将库存数量与窗口大小进行比较(如 count(stock)/windowSize>= 0.5),但没有找到任何。

预期结果是

库存尺寸为 3,窗口尺寸为 3,因此 100%

[Event{timestamp=1310446439000, data=[1310446439000, abc, 10.75, 3], isExpired=false}] 

库存尺寸为 1,窗口尺寸为 1,因此 100%

[Event{timestamp=1310446872000, data=[1310446872000, xyz, 65.25, 1], isExpired=false}]

库存尺寸为 1,窗口尺寸为 1,因此 100%

[Event{timestamp=1310446875000, data=[1310446875000, xyz, 0.0, 1], isExpired=false}]

所有这些结果的库存计数都大于窗口大小的 60%。

我还想知道我是否可以在窗口中维护任何状态?

首先定义窗口

define window StockEventWindow (ts long, stock string, bid double)    externalTimeBatch(ts,40 sec, ts); 

然后插入到窗口中的查询

from StockStream 
insert into StockEventWindow;

然后联接查询以检查当前窗口大小

from StockStream join StockEventWindow
select stock, count(*) as windowSize
insert into windowSizeStream;

然后查询以检查窗口中相关股票事件的编号

from StockStream join StockEventWindow on StockStream.stock == StockEventWindow.stock
select stock, count(*) as stockSize
insert into stockSizeStream;

然后模式查询以计算百分比并得到决策

From every e1=windowSizeStream -> e2=stockSizeStream[e1.stock == stock]
select e1.stock, (e2.stockSize/e1.windowSize)*100 as percentage
having percentage>60
insert into alert stream;

我没有运行查询。但是上面的实现线将与 siddhi 拥有的同步架构一起使用。

最新更新