我们正在评估Siddhi的CEP功能,并希望写一篇论文来描述它解决我们的规则用例的查询能力。
在写论文时,我想给出一个简单的规则。在数据集上
ts Stock Bid
------------------------------------------------------------------
2011-07-12 10:23:54.0 abc 10.12
2011-07-12 10:23:58.0 abc 10.34
2011-07-12 10:23:59.0 abc 10.75
2011-07-12 10:25:15.0 abc 11.98
2011-07-12 10:25:16.0 abc <null>
2011-07-12 10:25:22.0 xyz 45.16
2011-07-12 10:25:27.0 xyz 49.33
2011-07-12 10:31:12.0 xyz 65.25
2011-07-12 10:31:15.0 xyz <null>
要求:如果同一股票的数量大于或等于某个百分比,例如在 40 秒的窗口内发出警报,例如 60%。
到目前为止,我能够得出这个查询。
define stream StockStream (ts long,stock string, bid double);
define window StockEventWindow (ts long, stock string, bid double) externalTimeBatch(ts,40 sec, ts, 3 sec);
@info(name = 'query1')
from StockStream
insert into StockEventWindow;
@info(name = 'query2')
from StockEventWindow
select ts, stock, bid, count(stock) as c
group by stock
insert into OutputStream;
在输出流获得的结果:
[Event{timestamp=1310446439000, data=[1310446439000, abc, 10.75, 3], isExpired=false}]
[Event{timestamp=1310446516000, data=[1310446516000, abc, 0.0, 2], isExpired=false}, Event{timestamp=1310446527000, data=[1310446527000, xyz, 49.33, 2], isExpired=false}]
[Event{timestamp=1310446872000, data=[1310446872000, xyz, 65.25, 1], isExpired=false}]
[Event{timestamp=1310446875000, data=[1310446875000, xyz, 0.0, 1], isExpired=false}]
无法从这里继续前进以获得理想的结果。我正在寻找一个为我提供窗口大小的函数/运算符,因此我可以将库存数量与窗口大小进行比较(如 count(stock)/windowSize>= 0.5),但没有找到任何。
预期结果是
库存尺寸为 3,窗口尺寸为 3,因此 100%
[Event{timestamp=1310446439000, data=[1310446439000, abc, 10.75, 3], isExpired=false}]
库存尺寸为 1,窗口尺寸为 1,因此 100%
[Event{timestamp=1310446872000, data=[1310446872000, xyz, 65.25, 1], isExpired=false}]
库存尺寸为 1,窗口尺寸为 1,因此 100%
[Event{timestamp=1310446875000, data=[1310446875000, xyz, 0.0, 1], isExpired=false}]
所有这些结果的库存计数都大于窗口大小的 60%。
我还想知道我是否可以在窗口中维护任何状态?
首先定义窗口
define window StockEventWindow (ts long, stock string, bid double) externalTimeBatch(ts,40 sec, ts);
然后插入到窗口中的查询
from StockStream
insert into StockEventWindow;
然后联接查询以检查当前窗口大小
from StockStream join StockEventWindow
select stock, count(*) as windowSize
insert into windowSizeStream;
然后查询以检查窗口中相关股票事件的编号
from StockStream join StockEventWindow on StockStream.stock == StockEventWindow.stock
select stock, count(*) as stockSize
insert into stockSizeStream;
然后模式查询以计算百分比并得到决策
From every e1=windowSizeStream -> e2=stockSizeStream[e1.stock == stock]
select e1.stock, (e2.stockSize/e1.windowSize)*100 as percentage
having percentage>60
insert into alert stream;
我没有运行查询。但是上面的实现线将与 siddhi 拥有的同步架构一起使用。