>问题:我需要找出过去(例如 24 小时(如何发送消息。我有以和状态存储进行查找。
@SendTo(Bindings.MESSAGE_STORE)
@StreamListener(Bindings.MO)
public KStream<?, ?> groupBySender(KStream<String, Message> messages) {
return messages.selectKey((key,message) -> message.from)
.map((k,v) -> new KeyValue<>(k, v.sentAt.toString()))
.groupByKey()
.reduce((oldTimestamp, newTimestamp) -> newTimestamp,
Materialized.as(AggregatorApplication.MESSAGE_STORE))
.toStream();
}
它工作正常
[
"key=123 value=2019-06-21T13:29:05.509Z",
"key=from value=2019-06-21T13:29:05.509Z",
]
所以查找像:
store.get(from);
但我想自动从存储中删除早于 24 小时的条目,目前它们可能会永远保留
有没有更好的方法可以做到这一点? 也许是一些窗口操作之类的?
Atm,KTables
(基本上是键值存储(不支持TTL(参见 https://issues.apache.org/jira/browse/KAFKA-4212(
当前建议是,如果要使数据过期,请使用窗口存储。您可能希望使用自定义.transform()
而不是windowedBy().reduce()
,以获得更大的灵活性。(参看 https://docs.confluent.io/current/streams/developer-guide/processor-api.html(