如何加入国家生存时间(TTL)



在Flink作业中,在将数据保存到数据库中之前,我读取Kafka流并应用一些联接。Kafka主题包含两种类型的数据,所以我首先连接这两个记录,创建一行,并将其保存到数据库中。我需要将最新的数据存储在数据库中。

例如

{"id":1, "item":"1", "amount":300, "type":"revenue"}
{"id":1, "item":"1", "amount":30, "type":"profit"}

我需要表中的数据

id | item | revenue | profit
1  |  1   | 300     | 30

我正在Kafka源之上创建一个createTemporaryView("tableA"(,然后应用下面的连接逻辑

Table tr = tableEnv.sqlQuery("select " +
" coalesce(a.id, b.id) id," +
" coalesce(a.item, b.item) item," +
" a.amount as revenue," +
" b.amount as profit" +
" from " +
" (select * from tableA" +
" where type='revenue') a" +
" full outer join " +
" (select * from tableA" +
" where type='profit') b" +
" on a.id=b.id, a.item=b.item");

最后,将数据保存在表中。它运行良好。燧发枪手保持着联盟的所有状态。如果id=1出现了100次,那么flink会在其状态下维护100个条目。因此,将出现内存问题

Flink对我来说是新的,我们有没有任何设置(TTL(来确保它只保留上次更新的值并删除所有以前的状态?此外,如何配置rockDB设置来为这种状态维护强健的存储。

flink版本:1.13.6

Flink的状态TTL机制对于这个用例没有正确的行为。状态TTL保留了太多的状态,因为它保留了所有最近的状态,而不仅仅是最近的状态。更糟糕的是,当它不再是最近的状态时,它会下降到最近的状态。

Immerok Apache Flink食谱中的一个涵盖了这个案例;请参阅本配方中的流式处理表工作流,了解如何跟踪每个客户的最新交易。这是top-n查询的一个示例,其中n等于1:

SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr DESC AS rownum
FROM table_name)
WHERE rownum = 1

这里的关键是在查询本身中表达约束,以便SQL引擎创建一个只保留必要状态的计划。

免责声明:我为Immerok工作。

最新更新