我是 Flink 的新手,我正在尝试使用它来获得我的应用程序的大量实时视图。我想构建的动态视图中至少有一个是显示未满足SLA或基本上已过期的条目,其条件是简单的时间戳比较。因此,如果最近没有发生事件,我基本上希望一个条目显示在我的动态表中。在开发环境中使用 Flink 1.6(由于 AWS Kinesis 而受到限制)时,我没有看到 Flink 正在重新评估条件,除非事件触及该条目。
我已将开发环境插入到 Kinesis 流中,该流从 Web 服务器发送实时访问日志事件。这不是我真正的用例,但很容易开始测试。我编写了一个简单的表查询,该查询拉入请求路径及其上次访问时间,并计算布尔标志以指示是否在最后一分钟内未访问它。我正在通过连接到 PrintSinkFunction 的撤回流进行调试,以便所有更新/删除都打印到我的控制台。
tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");
Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());
我希望当我访问页面时,会向此流发送一个 Add 事件。然后,如果我等待 1 分钟(什么都不做),表中的 CASE 语句的计算结果将为 1,因此我应该看到一个 Delete,然后是设置了该标志的 Add 事件。
我实际看到的是,在我再次加载该页面之前,什么都不会发生。Delete 事件实际上设置了标志,而紧随其后的 Add 事件再次清除了该标志(因为它不再"过期",因此应该清除)。
// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event
编辑:我在搜索中遇到的最有用的提示是创建一个ProcessFunction。我认为这是我可以用我的动态表做的事情(在某些情况下,我最终会用中间流来查看计算的日期),但希望它不必这样做。
我已经让ProcessFunction方法工作,但它需要比我最初想象的更多的修补:
- 我不得不在我的 POJO 中添加一个在 onTimer() 方法中更改的字段(可以是日期或您每次都简单地碰到的版本)
- 我必须将此字段注册为动态表的一部分 我
- 必须在查询中使用此字段,以便重新评估查询并更改布尔标志(即使我实际上并没有使用新字段)。我只是将其添加为我的 SELECT 子句的一部分。
你的方法看起来很有希望,但 Flink 的表 API/SQL 不支持与移动的"现在"时间戳进行比较(尚不支持)。
我将分两步解决这个问题。
- 在 upsert 模式下注册动态表,即根据版本时间戳(在您的情况下
requestTime
)按键(在您的情况下request
)更新插入的表。生成的动态表将保存每个请求的最新行。 - 使用像您这样的简单筛选器谓词进行查询,该谓词比较动态 (upsert) 表行的版本时间戳,并筛选出时间戳太接近现在的所有行。
不幸的是,这两个功能(更新插入转换和与移动的"现在"时间戳的比较)在 Flink 中都不可用。不过,更新插入表转换正在进行一些工作。