我正试图在我的一个Kafka处理器中安排一个任务,从本地KeyValueStore (RocksDB)中删除记录。即使到目前为止没有出现异常,也没有任何记录被删除。下面是我的代码:
processorContext.schedule(Duration.ofHours(6), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
store.all().forEachRemaining(keyValue -> {
// CommonUtil.removeOutdatedMessage(store,keyValue.key,keyValue.value.getSentAt());
LocalDateTime sentAt = null;
try {
sentAt = LocalDateTime.parse(keyValue.value.getSentAt(), DateTimeFormatter.ISO_OFFSET_DATE_TIME);
} catch (DateTimeException ex) {
LOGGER.warn("Parsing of date {} failed for message with: {}", keyValue.value.getSentAt(), ex);
}
if (sentAt == null) {
store.delete(keyValue.key);
} else {
boolean isExpired = sentAt.isBefore(LocalDateTime.now().minusDays(Constants.MESSAGE_EXPIRATION_LIMIT));
if (isExpired) {
store.delete(keyValue.key);
}
}
});
});
}
您没有提供任何有关您正在解析的日期值的详细信息。还有,您看到日期解析失败的日志消息了吗?多说一点细节肯定会有帮助。
根据您分享的代码片段,我假设sentAt variable is being set as nonNull
由于异常或其他原因而导致isExpired as false
。
因此,您不会看到任何删除发生。在你的代码中放置调试点来找出对象流。
这是你的java代码的问题,而不是Kafka状态存储。