当流事件加入 WSO2 CEP 中时,每个输出事件触发两次



我正在使用嵌入在WSO2 DAS中的WSO2 CEP对于以下执行计划

@Import('RelatedStream:1.0.0')
define stream rs (ID string, product string, uc1 string, state string, brand string, model string, type string, tweet string, rtID string);
@Import('InputStream:1.0.0')
define stream ins (ID string, product string, uc1 string, state string, brand string, model string, type string, tweet string);

@Export('matchingStream:1.0.0')
define stream ms (rID string, rproduct string, ruc1 string, rstate string, rbrand string, rmodel string, rtype string, hID string, hproduct string, huc1 string, hstate string, hbrand string, hmodel string, htype string);
from ins#window.time(2 sec) as R 
  join rs#window.length(1) as H
  on R.product == H.product and R.brand==H.brand and R.type==H.type and R.model==H.model and R.state!=H.state and R.ID==H.rtID
select R.ID as rID, R.product as rproduct , R.uc1 as ruc1 , R.state as rstate, R.brand as rbrand , R.model as rmodel , R.type as rtype ,H.ID as hID , H.product as hproduct , H.uc1 as huc1 , H.state as hstate , H.brand as hbrand, H.model as hmodel , H.type as htype
insert all events into ms;

每个输出事件生成两次具有相同值的

次数

事件消息跟踪器日志如下所示

一(输入流)

08:23:21,845 [-] [DataBridge-Core-pool-1-thread-3]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : InputStream:1.0.0 (ins), before processing _Event{timestamp=1464144801440, data=[27, phone, g1, sell, samsung, galaxy note, type, Use UM10 to get 10% OFF #unlockyourphone Galaxy Note 6 will reportedly be the first Samsung phone to feature US... sell], isExpired=false} (Sanitized)

分流二(相关流)

08:23:21,974 [-] [DataBridge-Core-pool-1-thread-2]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801943, data=[5, phone, g1, sell, samsung, iphone 4s, type, White 3 USB Port Car Charger Adapter For iPhone 4S 5S 5C 6 6S iPad Samsung Phone - Bid Now? sell, 27], isExpired=false} (Sanitized)
08:23:21,998 [-] [DataBridge-Core-pool-1-thread-4]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[11, phone, g1, buy, samsung, galaxy s4, type, #cellular #deals  Samsung Galaxy S4 SCH-I545 16GB Verizon AT&T GSM UNLOCKED Cell Phone RF buy, 27], isExpired=false} (Sanitized)
08:23:22,030 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[13, phone, g1, sell, samsung, galaxy note, type, UNLOCKED T-Mobile Samsung Galaxy Note 3 SM-N900T 4G LTE GSM 32GB Smart Phone  sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[18, phone, g1, sell, samsung, galaxy s6, type, Cell Phones : New Samsung Galaxy S6 Edge SM-G925F 5.1'' 16MP (FACTORY UNLOCKED) 32GB Phone  sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[19, phone, g1, sell, samsung, galaxy s4, type, #cellular #deals  Samsung i545 Galaxy S4 16GB Verizon 13MP Camera WiFi Cell Phone sell, 27], isExpired=false} (Sanitized)
08:23:22,031 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[1, phone, g1, sell, samsung, galaxy s6, type, Cell Phone USA : New Samsung Galaxy S6 Edge SM-G925F 5.1'' 16MP (FACTORY UNLOCKED) 32GB Ph?  sell, 27], isExpired=false} (Sanitized)
08:23:22,033 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801944, data=[14, phone, g1, sell, samsung, galaxy note, type, Galaxy Note 6 will reportedly be the first Samsung phone to feature USB-C -  sell, 27], isExpired=false} (Sanitized)
08:23:22,034 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[16, phone, g1, buy, samsung, galaxy note, type, #unlocksquare Galaxy Note 6 will reportedly be the first Samsung phone to feature USB-C  buy, 27], isExpired=false} (Sanitized)
08:23:22,035 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[17, phone, g1, buy, samsung, galaxy s5, type, Cell Phone USA : NEW T-Mobile Protective Cover/Holster For Samsung Galaxy S5 Case Kickstan?  buy, 27], isExpired=false} (Sanitized)
08:23:22,035 [-] [DataBridge-Core-pool-1-thread-5]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : RelatedStream:1.0.0 (rs), before processing _Event{timestamp=1464144801945, data=[15, phone, g1, buy, samsung, galaxy s6, type, Cell Phone USA : Samsung Galaxy S6 SM-G920F 32GB Unlocked 16MP Smartphone  #4422  buy, 27], isExpired=false} (Sanitized)

输出流(匹配流)

08:23:22,041 [-] [Siddhi-R_H_Match-executor-thread-1]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : matchingStream:1.0.0 (ms), after processing _[Event{timestamp=1464144801945, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isExpired=false}] (Sanitized)
08:23:22,120 [-] [Siddhi-R_H_Match-executor-thread-1]  INFO TenantId : -1234, Event Processor : R_H_Match, Event Stream : matchingStream:1.0.0 (ms), after processing _[Event{timestamp=1464144802037, data=[27, phone, g1, sell, samsung, galaxy note, type, 16, phone, g1, buy, samsung, galaxy note, type], isExpired=false}] (Sanitized)

将所有事件插入输出事件流时,它包括当前事件(传入事件)和过期事件(超时后或超过窗口长度时由窗口发出)。因此,有可能获得重复项。

如果你的要求是使用"rs"流的传入事件触发输出,你可以尝试如下操作:(当前事件)

from ins#window.time(2 sec) as R 
  join rs#window.length(0) as H
  on R.product == H.product and R.brand==H.brand and R.type==H.type and R.model==H.model and R.state!=H.state and R.ID==H.rtID
select R.ID as rID, R.product as rproduct , R.uc1 as ruc1 , R.state as rstate, R.brand as rbrand , R.model as rmodel , R.type as rtype ,H.ID as hID , H.product as hproduct , H.uc1 as huc1 , H.state as hstate , H.brand as hbrand, H.model as hmodel , H.type as htype
insert current events into ms;

最新更新