Flink CEP无法在统一的表上获得正确的结果

  • 本文关键字:结果 Flink CEP apache-flink
  • 更新时间 :
  • 英文 :


我使用Flink SQL和CEP来识别一些非常简单的模式。然而,我发现了一个奇怪的东西(可能是一个bug(。我有两个示例表password_changetransfer,如下所示。

传输

transid,accountnumber,sortcode,value,channel,eventtime,eventtype
1,123,1,100,ONL,2020-01-01T01:00:01Z,transfer
3,123,1,100,ONL,2020-01-01T01:00:02Z,transfer
4,123,1,200,ONL,2020-01-01T01:00:03Z,transfer
5,456,1,200,ONL,2020-01-01T01:00:04Z,transfer

密码更改

accountnumber,channel,eventtime,eventtype
123,ONL,2020-01-01T01:00:05Z,password_change
456,ONL,2020-01-01T01:00:06Z,password_change
123,ONL,2020-01-01T01:00:08Z,password_change
123,ONL,2020-01-01T01:00:09Z,password_change

以下是我的SQL查询。

首先创建一个临时视图事件作为

(SELECT accountnumber,rowtime,eventtype FROM password_change WHERE channel='ONL') 
UNION ALL 
(SELECT accountnumber,rowtime, eventtype FROM transfer WHERE channel = 'ONL' )

rowtime列是直接从原始eventtimecol中提取的事件时间,水印周期性绑定为1秒。

然后输出的查询结果

SELECT * FROM `event`
MATCH_RECOGNIZE ( 
PARTITION BY accountnumber 
ORDER BY rowtime 
MEASURES 
transfer.eventtype AS event_type,
transfer.rowtime AS transfer_time
ONE ROW PER MATCH 
AFTER MATCH SKIP PAST LAST ROW
PATTERN (transfer password_change )  WITHIN INTERVAL '5' SECOND 
DEFINE 
password_change AS eventtype='password_change', 
transfer AS eventtype='transfer' 
)

应输出

123,transfer,2020-01-01T01:00:03Z
456,transfer,2020-01-01T01:00:04Z

但我在运行Flink 1.11.1时一无所获(1.10.1也没有输出(

更重要的是,我将模式更改为仅password_change,它仍然不输出任何内容,但如果我将模式改为transfer,它将输出几行,但不是所有的传输行。如果我交换两个表的事件时间,这意味着让password_changes首先发生,那么模式password_change将输出几行,而transfer不输出。

另一方面,如果我从两个表中提取这些列,并手动将它们合并到一个表中,然后将它们发送到Flink中,则运行结果是正确的。

我搜索并尝试了很多方法,包括更改SQL语句、水印、缓冲区超时等,但都无济于事。希望这里的任何人都能帮忙。谢谢

2020年10月10日更新:

我使用卡夫卡作为表源。tEnvStreamTableEnvironment

Kafka kafka=new Kafka()
.version("universal")
.property("bootstrap.servers", "localhost:9092");
tEnv.connect(
kafka.topic("transfer")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
.field("transid",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("value",DataTypes.DECIMAL(38,18))
).createTemporaryTable("transfer");

tEnv.connect(
kafka.topic("pchange")
).withFormat(
new Json()
.failOnMissingField(true)
).withSchema(
new Schema()
.field("rowtime",DataTypes.TIMESTAMP(3))
.rowtime(new Rowtime()
.timestampsFromField("eventtime")
.watermarksPeriodicBounded(1000)
)
.field("channel",DataTypes.STRING())
.field("accountnumber",DataTypes.STRING())
.field("eventtype",DataTypes.STRING())
).createTemporaryTable("password_change");

感谢@Dawid Wysakowicz的回答。为了确认这一点,我在transfer表的末尾添加了4,123,1,200,ONL,2020-01-01T01:00:10Z,transfer,然后输出就正确了,这意味着水印确实存在一些问题。

所以现在的问题是如何修复它。由于用户不会频繁更改密码,因此这两个表之间的时间间隔是不可避免的。我只需要UNIONALL表具有与我手动合并的行为相同的行为。

2020年11月4日更新:
使用空闲源的水印策略可能会有所帮助。

问题很可能是与UNION ALL运算符一起生成水印。你能分享一下你是如何创建这两个表的吗?包括你如何定义时间属性以及连接器是什么?它可以让我证实我的怀疑。

我认为问题是其中一个源停止发射水印。如果transfer表(或具有较低时间戳的表(没有完成并且没有生成任何记录,则它不会发出水印。在发射第四行之后,它将发射CCD_ 11。输入并集的水印是这两个值中最小的一个。因此,第一个表将暂停/保留值为Watermark = 3的水印,因此您看不到原始查询的任何进展,并且您看到为具有较小时间戳的表发出的一些记录。

如果手动连接这两个表,则只有一个输入和一个水印源,因此它会进一步发展,您会看到一些结果。

相关内容

  • 没有找到相关文章

最新更新