如何在flink SQL表中自动生成水印?



我正在测试flink cep sql和我的水印被定义为行时间,我的表是一个kafka表。由于水印依赖于所有kafka分区的最小值,所以每个新消息都必须等待kafka分区对齐,然后cep触发结果。

我的kafka表(topic有3个分区)定义为

create table test_table(
agent_id String, room_id String, 
create_time Bigint, 
call_type String, 
application_id String, 
connect_time Bigint, 
row_time as to_timestamp_ltz(create_time, 3), 
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
)

这是我的cep sql

select * from test_table  match_recognize (
partition by agent_id,room_id,call_type 
order by row_time
measures  
last(BF.create_time) as create_time, 
last(AF.connect_time) as connect_time 
one row per match after match SKIP PAST LAST ROW 
pattern (BF+ AF) WITHIN INTERVAL '1' HOUR 
define 
BF as BF.connect_time = 0,
AF as AF.connect_time > 0 and BF.room_id = AF.room_id and BF.call_type = AF.call_type 
) as T ;

cep sql触发结果是正确的,但总是延迟,因为每个分区都需要对齐水印。如何立即得到最新的结果自动生成水印在flink sql表

?

您的模式要求查找connect_time > 0的行,该行紧接connect_time = 0的行(其中两行具有相同的room_id和call_type)。为了使这种模式匹配完全正确地完成,有必要等待水印。否则,过早的匹配可能会由于无序事件的到来而失效——例如,在AF之前的connect_time < 0事件(您可能知道这是不可能的,但cep/sql引擎不可能知道这一点)。

如果您愿意放松模式匹配语义,为什么不将这个MATCH_RECOGNIZE查询替换为间隔连接(带有时间约束的自连接)呢?详见https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/joins/#interval-joins

BTW,AF的这一部分定义

... and BF.room_id = AF.room_id and BF.call_type = AF.call_type

没有任何作用,因为流已经被room_idcall_type分区了。

相关内容

  • 没有找到相关文章

最新更新