我想使用时间戳偏移,然后尝试kafka.tools.getoffsetshell命令工具。该文档是:https://cwiki.apache.org/confluence/display/kafka/system tools
我认为此命令在我们指定的时间戳之前返回最新的N偏移。但是我尝试了几个命令并感到困惑...
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list ka1:9092
--time -1
--topic test_topic
--offsets 100
--partitions 61
返回:
test_topic:61:6269917760,6257457002
然后:
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list ka1:9092
--time -2
--topic test_topic
--offsets 100
--partitions 61
返回:
test_topic:61:6257457002
然后:
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list ka1:9092
--time 1430742921000
--topic test_topic
--offsets 100
--partitions 61
返回空集!!!
test_topic:61:
此工具如何工作?
kafka将其日志存储在"日志段"中。这些设置在配置中:
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
kafka.tools.getOffsetshell匹配日志段的时间戳,而不是偏移的时间戳。
因此,每个日志段的大小为1073.74兆字节(默认)。因此,如果您的历史记录中的消息将与3,222兆字节相在一起,则您可以通过时间戳查询4个不同的日志段。
这就是为什么:
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list ka1:9092
--time 1430742921000
--topic test_topic
--offsets 100
--partitions 61
您返回:
test_topic:61:
当您查询时间戳时,在时间轴上,在任何日志段之前,随着工具将日志段的最后一个修改日期匹配到提供的时间戳。
。我通过创建测量日志文件大小的测试并在创建每个新日志后查询时间戳。