我有一个有趣的用例,我想用flink进行测试。我有Message
的传入流,即PASS
或FAIL
。现在,如果该消息是类型FAIL
,我有一个下游ProcessFunction
,它保存Message
状态,然后将pause
命令发送到依赖此的所有内容。当我收到与我之前收到的FAIL
关联的PASS
消息(按消息ID键合)时,我将resume
命令发送到我早些时候停止的所有内容。
现在,我计划使用状态TTL到期存储的FAIL
状态并在特定超时后恢复所有内容,即使我还没有收到具有相同消息ID的PASS
消息。这可以单独使用弗林克(Flink)完成,还是我需要有一些外部计时器将超时消息发送给我的程序?
我有类似的事情可以使它在Flink中工作:
对于每个Message
,添加时间戳并将其传递到一个过程函数,该过程函数等待直到current_ts - timestamp == timeout
,然后将其发送到恢复模块暂停的所有内容。有更好的方法还是你认为这还可以吗?
似乎更简单地使用计时器到期状态(通过在Ontimer方法中调用状态.clear(),而不是使用状态ttl。相同的Ontimer方法也可以安排同时恢复的内容。