从2.5.0版本开始,KafkaAdmin API中有一个方法listOffset
。此方法可以按时间戳返回偏移量。
/**
* Used to retrieve the the earliest offset whose timestamp is greater than
* or equal to the given timestamp in the corresponding partition
* @param timestamp in milliseconds
*/
public static OffsetSpec forTimestamp(long timestamp) {
return new TimestampSpec(timestamp);
}
并且该方法还返回具有字段offset
和timestamp
的对象ListOffsetsResultInfo
。所以我的问题是->API在这两种情况下都提到了什么时间戳?如果是,它是否为KafkaRecord
的时间戳->它能和message.timestamp.type=CreateTime
一起工作吗?还是仅使用message.timestamp.type=LogAppendTime
?
当记录被发送到Kafka时,生产者可以根据message.timestamp.type
指定时间戳或不指定时间戳。如果生产者没有指定时间戳,Kafka会将其设置为当前时间。所以最后,每个Record都有一个时间戳值。
代理根据Kafka记录的时间戳检查OffsetSpec.forTimestamp()
的timestamp
参数。
无论Record的时间戳是来自CreateTime
(由生产者提供)还是来自broker提供的LogAppendTime
,它都适用。