状态到期时flink触发



我有一个有趣的用例,我想用flink进行测试。我有Message的传入流,即PASSFAIL。现在,如果该消息是类型FAIL,我有一个下游ProcessFunction,它保存Message状态,然后将pause命令发送到依赖此的所有内容。当我收到与我之前收到的FAIL关联的PASS消息(按消息ID键合)时,我将resume命令发送到我早些时候停止的所有内容。

现在,我计划使用状态TTL到期存储的FAIL状态并在特定超时后恢复所有内容,即使我还没有收到具有相同消息ID的PASS消息。这可以单独使用弗林克(Flink)完成,还是我需要有一些外部计时器将超时消息发送给我的程序?

我有类似的事情可以使它在Flink中工作:

对于每个Message,添加时间戳并将其传递到一个过程函数,该过程函数等待直到current_ts - timestamp == timeout,然后将其发送到恢复模块暂停的所有内容。有更好的方法还是你认为这还可以吗?

似乎更简单地使用计时器到期状态(通过在Ontimer方法中调用状态.clear(),而不是使用状态ttl。相同的Ontimer方法也可以安排同时恢复的内容。

相关内容

  • 没有找到相关文章

最新更新