将条目保留在状态存储中仅定义的时间



>问题:我需要找出过去(例如 24 小时(如何发送消息。我有以和状态存储进行查找。

@SendTo(Bindings.MESSAGE_STORE)
@StreamListener(Bindings.MO)
public KStream<?, ?> groupBySender(KStream<String, Message> messages) {
     return  messages.selectKey((key,message) -> message.from)
                     .map((k,v) -> new KeyValue<>(k, v.sentAt.toString()))
                     .groupByKey()
                     .reduce((oldTimestamp, newTimestamp) -> newTimestamp,
                                Materialized.as(AggregatorApplication.MESSAGE_STORE))
                     .toStream();
}

它工作正常

[
    "key=123 value=2019-06-21T13:29:05.509Z",
    "key=from value=2019-06-21T13:29:05.509Z",
]

所以查找像:

store.get(from);

但我想自动从存储中删除早于 24 小时的条目,目前它们可能会永远保留

有没有更好的方法可以做到这一点? 也许是一些窗口操作之类的?

Atm,KTables(基本上是键值存储(不支持TTL(参见 https://issues.apache.org/jira/browse/KAFKA-4212(

当前建议是,如果要使数据过期,请使用窗口存储。您可能希望使用自定义.transform()而不是windowedBy().reduce(),以获得更大的灵活性。(参看 https://docs.confluent.io/current/streams/developer-guide/processor-api.html(

相关内容

  • 没有找到相关文章

最新更新